# -*- coding: utf-8 -*-
# Author: LKouadio <etanoyau@gmail.com>
# License: BSD-3-Clause
"""
Functions to load sample datasets included with the ``fusionlab``
package, suitable for demonstrating and testing forecasting models
and utilities. Datasets are returned as pandas DataFrames or structured
Bunch objects.
"""
from __future__ import annotations
import os
import textwrap
import warnings
import joblib
from pathlib import Path
import pandas as pd
import numpy as np
from typing import (
Optional,
Union,
Tuple,
List,
Dict,
Any
)
from sklearn.preprocessing import (
MinMaxScaler,
StandardScaler,
OneHotEncoder
)
from .._fusionlog import fusionlog
from ..api.bunch import XBunch
from ._property import (
get_data,
download_file_if,
RemoteMetadata,
FLAB_DMODULE,
FLAB_REMOTE_DATA_URL
)
from ._config import CITY_CONFIGS
logger = fusionlog().get_fusionlab_logger(__name__)
# --- Metadata Definition ---
_ZHONGSHAN_METADATA = RemoteMetadata(
file='zhongshan_2000.csv',
url=FLAB_REMOTE_DATA_URL,
checksum=None, # TODO: Add checksum
descr_module=None,
data_module=FLAB_DMODULE
)
_NANSHA_METADATA = RemoteMetadata(
file='nansha_2000.csv',
url=FLAB_REMOTE_DATA_URL,
checksum=None, # TODO: Add checksum
descr_module=None,
data_module=FLAB_DMODULE
)
CITY_CONFIGS["zhongshan"]['metadata'] = _ZHONGSHAN_METADATA
CITY_CONFIGS["nansha"]['metadata'] = _NANSHA_METADATA
__all__ = [
"fetch_zhongshan_data",
"fetch_nansha_data",
"load_processed_subsidence_data",
"load_subsidence_pinn_data"
]
[docs]
def load_subsidence_pinn_data(
data_name: str = 'zhongshan',
strategy: str = "load",
n_samples: Optional[int] = None,
include_coords: bool = True,
include_target: bool = True,
encode_categoricals: bool = True,
scale_numericals: bool = True,
scaler_type: str = "minmax",
as_frame: bool = False,
data_home: Optional[str] = None,
use_cache: bool = True,
save_cache: bool = False,
cache_suffix: str = "",
augment_data: bool = False,
augment_mode: str = 'both',
group_by_cols: Optional[List[str]] = None,
time_col: Optional[str] = None,
value_cols_interpolate: Optional[List[str]] = None,
feature_cols_augment: Optional[List[str]] = None,
interpolation_config: Optional[Dict[str, Any]] = None,
feature_config: Optional[Dict[str, Any]] = None,
target_name: Optional[str] = None,
interpolate_target: bool = False,
coordinate_precision: Optional[int] = 4,
year_range: Union [Tuple [float, float] , None] = None,
coords_range: Union [
Tuple[Tuple[float, float], Tuple[float, float]],
None] = None,
vars_range = None,
verbose: int = 1,
) -> Union[pd.DataFrame, XBunch]:
from ..utils.geo_utils import augment_city_spatiotemporal_data
from ..utils.geo_utils import generate_dummy_pinn_data
if data_name.lower() not in CITY_CONFIGS:
raise ValueError(
f"Unknown data_name: '{data_name}'. "
f"Choose from {list(CITY_CONFIGS.keys())}."
)
cfg = CITY_CONFIGS[data_name.lower()]
metadata = cfg["metadata"]
# --- Step 0 - Validate user-provided data_home path ---
data_path_resolved, metadata =_resolve_data_path (
data_home, metadata=metadata, data_name=data_name,
strategy=strategy, verbose =verbose,
)
# --- Caching Logic ---
cache_dir = get_data(data_home)
# Include data_name in cache filename for uniqueness
base_name = f"{data_name}_{os.path.splitext(metadata.file)[0]}"
proc_fname = f"{base_name}_processed{cache_suffix}.joblib"
proc_fpath = os.path.join(cache_dir, proc_fname)
df = None
encoder_info: Dict[str, Any] = {}
scaler_info: Dict[str, Any] = {}
if use_cache:
try:
cached = joblib.load(proc_fpath)
if isinstance(cached, dict) and "data" in cached:
df = cached["data"]
encoder_info = cached.get("encoder_info", {})
scaler_info = cached.get("scaler_info", {})
if verbose >= 1:
logger.info(f"Loaded cached data from: {proc_fpath}")
else: # Cache file is invalid
if verbose >= 1:
logger.info(
f"Cache file {proc_fpath} is invalid or malformed."
)
df = None # Force reload/generation
except FileNotFoundError:
if verbose >= 1:
logger.info(f"No cache file found at: {proc_fpath}")
except Exception as e:
warnings.warn(
f"Error loading cache ({proc_fpath}: {e}); will reprocess."
)
df = None # Force reload/generation
# --- Data Loading / Generation ---
if df is None: # If not loaded from cache
if strategy == "generate":
df = None # Force dummy generation
elif strategy == "load":
if not os.path.exists(data_path_resolved):
raise FileNotFoundError(
f"Data file '{data_path_resolved}' not found and "
"strategy is 'load'."
)
if verbose >= 1:
logger.info(
f"Loading raw data from: {data_path_resolved}"
)
df = pd.read_csv(data_path_resolved)
elif strategy == "fallback":
try:
if verbose >= 1:
logger.info(
f"Attempting to load from: {data_path_resolved}"
)
df = pd.read_csv(data_path_resolved)
except Exception:
if verbose >= 1:
logger.info(
f"Failed to load '{data_path_resolved}', "
"generating dummy data."
)
df = None # Trigger dummy generation
else:
raise ValueError(
f"Invalid strategy '{strategy}'. Choose 'load', "
"'generate', or 'fallback'."
)
if df is None: # Generate dummy data
n_samples = n_samples or 500_000
if verbose >= 1:
logger.info(
f"Generating {n_samples} dummy samples "
f"for {data_name}..."
)
dummy_data_dict = generate_dummy_pinn_data(
n_samples=n_samples,
year_range=year_range,
coords_range=coords_range,
vars_range=vars_range,
)
# Add city-specific categorical and numerical features
for cat_col in cfg["categorical_cols"]:
# Simplified dummy categories
dummy_data_dict[cat_col] = np.random.choice(
[f"{cat_col}_A", f"{cat_col}_B", f"{cat_col}_C"],
size=n_samples
)
for num_col in cfg["numerical_main"]:
if num_col not in dummy_data_dict:
dummy_data_dict[num_col] = np.random.rand(n_samples)
if data_name == 'nansha' and 'soil_thickness' in cfg[
"numerical_main"
]:
dummy_data_dict['soil_thickness'] = np.random.uniform(
1, 10, n_samples
)
df = pd.DataFrame(dummy_data_dict)
if verbose >= 1:
logger.info(f"Data loaded/generated. Initial shape: {df.shape}")
# --- Preprocessing ---
# Ensure essential columns exist
essential_for_core_processing = [
cfg["time_col"],
cfg["lon_col"],
cfg["lat_col"],
cfg["subsidence_col"],
cfg["gwl_col"]
]
missing_essentials = [
c for c in essential_for_core_processing if c not in df.columns
]
if missing_essentials:
raise ValueError(
f"DataFrame for {data_name} is missing essential columns: "
f"{missing_essentials}"
)
df = df.dropna(subset=essential_for_core_processing).copy()
# Datetime conversion
try:
time_column_to_convert = cfg["time_col"]
if pd.api.types.is_numeric_dtype(df[time_column_to_convert]) and \
all(df[time_column_to_convert].apply(
lambda x: 1900 < x < 2100
)): # Heuristic for year int
df[cfg["dt_col_name"]] = pd.to_datetime(
df[time_column_to_convert], format="%Y"
)
else:
df[cfg["dt_col_name"]] = pd.to_datetime(
df[time_column_to_convert]
)
except Exception as e:
raise ValueError(
f"Error parsing '{cfg['time_col']}' to datetime: {e}"
)
df = df.dropna(subset=[cfg["dt_col_name"]])
if verbose >= 2:
logger.debug(
f"Shape after datetime conversion & NA drop: {df.shape}"
)
# One-Hot Encode
current_encoder_info = {
"columns": {}, "names": {}, "encoder_instance": None
}
if encode_categoricals:
cats_to_encode = [
c for c in cfg["categorical_cols"] if c in df.columns
]
if cats_to_encode:
if verbose >= 1:
logger.info(f"One-Hot encoding: {cats_to_encode}")
encoder = OneHotEncoder(
sparse_output=False,
handle_unknown='ignore',
dtype=np.float32
)
enc_data = encoder.fit_transform(df[cats_to_encode])
ohe_cols = encoder.get_feature_names_out(cats_to_encode)
enc_df = pd.DataFrame(
enc_data,
columns=ohe_cols,
index=df.index
)
df = df.drop(columns=cats_to_encode)
df = pd.concat([df, enc_df], axis=1)
for i, col_cat in enumerate(cats_to_encode):
current_encoder_info["columns"][col_cat] = [
name for name in ohe_cols
if name.startswith(col_cat + "_")
]
current_encoder_info["names"][col_cat] = list(
encoder.categories_[i]
)
current_encoder_info["encoder_instance"] = encoder
encoder_info = current_encoder_info
elif verbose >= 2:
logger.debug("No categorical columns found for encoding.")
elif verbose >= 1:
logger.info("Skipping categorical encoding.")
# Numerical Time Coordinate (must be done AFTER all rows are fixed)
df[cfg["time_col"] + "_numeric"] = (
df[cfg["dt_col_name"]].dt.year
+ (df[cfg["dt_col_name"]].dt.dayofyear - 1)
/ (365 + df[cfg["dt_col_name"]].dt.is_leap_year.astype(int))
)
# Scale Numerical Features
current_scaler_info = {"columns": [], "scaler_instance": None}
if scale_numericals:
# Scale only the main numerical features, not coords, time, or targets.
cols_to_scale = [
c for c in cfg["numerical_main"] if c in df.columns
]
if cols_to_scale:
if verbose >= 1:
logger.info(
f"Scaling numerical features: {cols_to_scale} "
f"with {scaler_type} scaler."
)
if scaler_type == "minmax":
scaler = MinMaxScaler()
elif scaler_type == "standard":
scaler = StandardScaler()
else:
raise ValueError(f"Unknown scaler_type: {scaler_type}")
df[cols_to_scale] = scaler.fit_transform(df[cols_to_scale])
current_scaler_info["columns"] = cols_to_scale
current_scaler_info["scaler_instance"] = scaler
scaler_info = current_scaler_info
elif verbose >= 2:
logger.debug(
"No numerical features found/specified for scaling."
)
elif verbose >= 1:
logger.info("Skipping numerical scaling.")
# Save to cache if enabled
if save_cache:
os.makedirs(cache_dir, exist_ok=True)
save_obj = {
"data": df,
"encoder_info": encoder_info,
"scaler_info": scaler_info
}
try:
joblib.dump(save_obj, proc_fpath)
if verbose >= 1:
logger.info(
f"Saved processed cache to: {proc_fpath}"
)
except Exception as e:
warnings.warn(
f"Failed to save cache to {proc_fpath}: {e}"
)
# --- Data Augmentation Step ---
if augment_data:
if verbose >= 1:
logger.info(
f"Applying data augmentation (mode: {augment_mode})..."
)
# Set defaults for augmentation parameters if not provided by user
aug_group_cols = group_by_cols or [
cfg["lon_col"], cfg["lat_col"]
]
aug_time_col = time_col or cfg["time_col"] # Original time col
aug_val_cols_interp = value_cols_interpolate
if aug_val_cols_interp is None:
aug_val_cols_interp = cfg["default_value_cols_interpolate"]
# Conditionally add target to interpolation
_aug_target_name = target_name or cfg["subsidence_col"]
if interpolate_target and _aug_target_name not in aug_val_cols_interp:
aug_val_cols_interp.append(_aug_target_name)
aug_feat_cols_noise = feature_cols_augment
if aug_feat_cols_noise is None:
aug_feat_cols_noise = cfg["default_feature_cols_augment"]
# Ensure target is not in noise augmentation list by default
if _aug_target_name in aug_feat_cols_noise:
aug_feat_cols_noise = [
c for c in aug_feat_cols_noise
if c != _aug_target_name
]
interp_conf = interpolation_config or {
'freq': 'AS', 'method': 'linear'
}
aug_conf = feature_config or {
'noise_level': 0.01, 'noise_type': 'gaussian'
}
df = augment_city_spatiotemporal_data( # Call the geo_utils function
df=df,
city=data_name, # Pass city to the underlying augmentation
mode=augment_mode,
group_by_cols=aug_group_cols,
time_col=aug_time_col, # Original time col for datetime ops
value_cols_interpolate=aug_val_cols_interp,
feature_cols_augment=aug_feat_cols_noise,
interpolation_config=interp_conf,
augmentation_config=aug_conf,
target_name=_aug_target_name,
interpolate_target=interpolate_target,
verbose=verbose > 1, # Pass down a boolean verbose
coordinate_precision=coordinate_precision,
# savefile parameter is not used here, augmentation in-memory
)
if verbose >= 1:
logger.info(f"Data augmentation complete. New shape: {df.shape}")
if as_frame:
return df.copy()
# --- Build XBunch for output ---
feature_cols = [
c for c in df.columns
if c not in [
cfg["dt_col_name"],
cfg["subsidence_col"],
cfg["gwl_col"]
]
]
if not include_coords:
feature_cols = [
c for c in feature_cols
if c not in [cfg["lon_col"], cfg["lat_col"]]
]
target_cols_bunch: list[str] = []
if include_target:
target_cols_bunch = [cfg["subsidence_col"], cfg["gwl_col"]]
# Ensure target columns exist before trying to access them
missing_targets = [
tc for tc in target_cols_bunch if tc not in df.columns
]
if missing_targets:
logger.warning(
f"Target columns {missing_targets} not found in DataFrame "
"for XBunch. Target array will be None."
)
target_array = None
target_cols_bunch = [
tc for tc in target_cols_bunch
if tc in df.columns
]
else:
target_array = df[target_cols_bunch].values
else:
target_array = None
data_array = (
df[feature_cols].values
if feature_cols
else np.array([[] for _ in range(len(df))])
)
descr = (
f"Processed {data_name.capitalize()} PINN data.\n"
f"Load Strategy: {strategy}.\n"
f"Cache Used: {'Yes' if use_cache else 'No'}, "
f"Cache Path: {proc_fpath if use_cache else 'N/A'}.\n"
f"Categorical Encoding: {'Applied' if encode_categoricals else 'Skipped'}.\n"
f"Numerical Scaling: {scaler_type if scale_numericals else 'Skipped'}.\n"
f"Augmentation: {'Applied' if augment_data else 'None'} "
f"(Mode: {augment_mode if augment_data else 'N/A'}).\n"
f"Rows: {len(df)}, Features: {len(feature_cols)} "
f"(in 'data' array).\n"
f"Targets: {target_cols_bunch if include_target else 'None'}.\n"
f"Coordinate Precision: {coordinate_precision} decimal places.\n"
f"Time Column (numeric): {cfg['time_col'] + '_numeric'}."
)
bunch_dict: Dict[str, Any] = {
"frame": df.copy(), # Return a copy
"data": data_array,
"feature_names": feature_cols,
"target_names": target_cols_bunch,
"target": target_array,
"DESCR": descr,
"encoder_info": encoder_info, # From cache or fresh processing
"scaler_info": scaler_info # From cache or fresh processing
}
if include_coords:
if cfg["lon_col"] in df.columns:
bunch_dict["longitude"] = df[cfg["lon_col"]].values
if cfg["lat_col"] in df.columns:
bunch_dict["latitude"] = df[cfg["lat_col"]].values
return XBunch(**bunch_dict)
load_subsidence_pinn_data.__doc__=r"""
Load and preprocess subsidence‐focused PINN data for Zhongshan or Nansha.
This function handles data retrieval (from local CSV or remote),
optional dummy‐data generation, caching, preprocessing (datetime
conversion, one‐hot encoding, numeric scaling), and optional
spatio‐temporal augmentation. When `return_dataframe=False`, it
returns an XBunch object suitable for downstream modeling.
Parameters
----------
data_name : str, default='zhongshan'
Which city dataset to load. Supported values: 'zhongshan',
'nansha'. Case‐insensitive. Used to select city‐specific
metadata (file names, feature lists, etc.).
strategy : {'load', 'generate', 'fallback'}, default='load'
- 'load': strictly attempt to read the CSV from a local or
downloaded location; raise an error if not found.
- 'generate': skip CSV loading and always generate randomized
“dummy” data matching the schema.
- 'fallback': attempt CSV load; if loading fails (file missing
or corrupted), generate dummy data instead.
n_samples : int or None, default=None
Number of dummy rows to generate when `strategy` is 'generate'
or when generation is triggered under 'fallback'. If None,
defaults to 500 000 samples.
include_coords : bool, default=True
If True, include `'longitude'` and `'latitude'` arrays in the
returned XBunch (under keys `"longitude"` and `"latitude"`).
If False, drop those coordinate columns from the feature set.
include_target : bool, default=True
If True, include subsidence and GWL as targets in the XBunch
(`"target_names"` and `"target"`). If False, return an XBunch
with no `"target"` array.
encode_categoricals : bool, default=True
If True, One‐Hot Encode any city‐specific categorical columns
(e.g., `'geology'`, `'density_tier'`). Otherwise, skip encoding.
scale_numericals : bool, default=True
If True, apply MinMaxScaler or StandardScaler (see `scaler_type`)
to the city’s “main” numeric features (e.g., rainfall, density,
seismic risk). Coordinates and targets are not scaled here.
scaler_type : {'minmax', 'standard'}, default='minmax'
Which scaler to apply when `scale_numericals=True`. - 'minmax'
uses `sklearn.preprocessing.MinMaxScaler`. - 'standard' uses
`sklearn.preprocessing.StandardScaler`.
as_frame : bool, default=False
If True, return the processed `pd.DataFrame` directly. Otherwise,
pack results into an `XBunch` with fields:
- `"frame"`: the processed DataFrame
- `"data"`: feature matrix (numpy array)
- `"feature_names"`: list of column names in `"data"`
- `"target_names"`: list of target column names (or `[]`)
- `"target"`: target array (or `None`)
- `"DESCR"`: textual description
- `"longitude"`, `"latitude"` (if `include_coords=True`)
- `"encoder_info"`: dict of one‐hot encoder details
- `"scaler_info"`: dict of scaler details
data_home : str or None, default=None
Root directory for caching and for locating local data files.
Passed to `fusionlab.datasets.get_data()`. If None, uses the
package’s default data directory.
use_cache : bool, default=True
If True, attempt to load a previously processed `.joblib` cache
(filename includes `data_name` and `cache_suffix`). If the cache
exists and is valid, skip reprocessing and return cached results.
save_cache : bool, default=False
If True, after successful processing, save the processed DataFrame
and encoder/scaler info to a `.joblib` file under `data_home`.
Subsequent calls with `use_cache=True` will load from this cache.
cache_suffix : str, default=''
Suffix to append to the cache filename (before `.joblib`). Useful
to distinguish different processing parameters or versions.
augment_data : bool, default=False
If True, apply spatio‐temporal augmentation via
`augment_city_spatiotemporal_data`. This can interpolate missing
values, add noise to features, and upsample the time dimension.
augment_mode : str, default='both'
Passed to the augmentation routine. Typical options include
`'both'` (interpolate + noise), `'interpolate_only'`,
`'noise_only'`. See `fusionlab.utils.geo_utils.augment_city_spatiotemporal_data`
for full details.
group_by_cols : list[str] or None, default=None
Which columns to group by during augmentation (e.g., coordinates).
If None, defaults to the city’s spatial columns (`lon_col`, `lat_col`).
time_col : str or None, default=None
Time column name (string) to pass to augmentation. If None,
uses the city’s configured `"time_col"`.
value_cols_interpolate : list[str] or None, default=None
Which numeric columns to interpolate during augmentation (e.g.,
`"GWL"`, `"rainfall_mm"`). If None, uses the city’s
`"default_value_cols_interpolate"` list.
feature_cols_augment : list[str] or None, default=None
Which columns to add noise to during augmentation. If None,
uses the city’s `"default_feature_cols_augment"` list. Note that
the target column is never noise‐augmented by default.
interpolation_config : dict or None, default=None
Configuration passed to the interpolation step of augmentation
(e.g., `{'freq': 'AS', 'method': 'linear'}`). If None, defaults
to `{'freq': 'AS', 'method': 'linear'}`.
feature_config : dict or None, default=None
Configuration for adding noise, e.g., `{'noise_level': 0.01,
'noise_type': 'gaussian'}`. If None, sensible defaults are used.
target_name : str or None, default=None
If `interpolate_target=True`, this names the column to interpolate
(default is the city’s `"subsidence_col"`). If not provided,
the function uses the configured subsidence column.
interpolate_target : bool, default=False
If True, include the target column itself in the interpolation
pass. (Useful when filling gaps in observed subsidence values.)
coordinate_precision : int or None, default=4
Number of decimal places to round latitude/longitude to. After
rounding, spatial grouping (e.g., augmentation) will treat points
at the same rounded coordinate as identical. Set to None to skip
coordinate rounding.
year_range : tuple[int, int] or None, default=None
If dummy data generation is used, the `(min_year, max_year)` range
for uniformly sampling integer years. If None, defaults to `(2000, 2025)`.
coords_range : tuple[tuple[float, float], tuple[float, float]] or None, default=None
If dummy generation is used, spatial bounds as
`((lon_min, lon_max), (lat_min, lat_max))`. If None, defaults to
`((113.0, 113.8), (22.3, 22.8))` for Zhongshan and a similar range
for Nansha.
vars_range : dict or None, default=None
If dummy generation is used, a dictionary specifying ranges for
other variables. For example:
`{"rainfall_mm": (500, 2500), "GWL": (1.0, 4.0)}`. If a variable
is omitted, its default distribution is used.
verbose : {0, 1, 2}, default=1
Controls verbosity of console output:
- 0: silent (except for exceptions).
- 1: high‐level info messages.
- 2: debug‐level messages (detailed shape/log prints).
Returns
-------
Union[pd.DataFrame, XBunch]
If `return_dataframe=True`, returns the processed DataFrame.
Otherwise, returns an XBunch with the following keys:
- frame: pandas DataFrame of processed data
- data: numpy array (rows × features) ready for modeling
- feature_names: list of column names corresponding to `data`
- target_names: list of target column names (or empty list)
- target: numpy array of target values (or None if `include_target=False`)
- DESCR: a multi‐line description string summarizing processing steps
- longitude, latitude: numpy arrays if `include_coords=True`
- encoder_info: dict containing one‐hot encoder metadata
- scaler_info: dict containing scaler metadata
Notes
-----
1. **Caching**: When `use_cache=True`, the function looks for a
`.joblib` file named `{data_name}_{basename}_processed{cache_suffix}.joblib`
under `data_home`. If found and valid, this file is loaded to
skip reprocessing. If `save_cache=True`, the final processed
DataFrame is saved to the same path for future reuse.
2. **Dummy Generation**: When `strategy='generate'` or when
fallback generation is triggered under `'fallback'`, the function
calls `generate_dummy_pinn_data(...)` to produce a synthetic
dataset. Users can override `year_range`, `coords_range`, and
`vars_range` to control the random distributions. See the
`fusionlab.utils.geo_utils.generate_dummy_pinn_data` docstring
for details on default behavior.
3. **Augmentation**: When `augment_data=True`, the function invokes
`augment_city_spatiotemporal_data(...)` with parameters:
- `group_by_cols`: columns used to group points (spatially)
- `time_col`: column used for temporal interpolation
- `value_cols_interpolate`: numeric columns to interpolate
- `feature_cols_augment`: columns to which noise is added
- `interpolation_config`: interpolation parameters (freq/method)
- `feature_config`: noise configuration (level/type)
- `coordinate_precision`: precision used to round coords before grouping
Ensure that `fusionlab.utils.geo_utils.augment_city_spatiotemporal_data`
is available in your installation if using augmentation.
4. **One‐Hot Encoding**: Only the configured categorical columns
(e.g., `'geology'`, `'density_tier'`) are encoded. All other
string columns remain unchanged.
5. **Numeric Scaling**: Only the city’s `numerical_main` features
(e.g., rainfall, density, seismic risk) are passed through the
chosen scaler. Coordinates, time numeric columns, and targets
are not scaled here; downstream models or sequence preprocessors
may handle those separately.
Examples
--------
**1. Simple load of processed Zhongshan data (no caching, no augmentation):**
>>> from fusionlab.datasets.load import load_subsidence_pinn_data
>>> zbunch = load_subsidence_pinn_data(
... data_name='zhongshan',
... strategy='load',
... use_cache=False,
... encode_categoricals=True,
... scale_numericals=True,
... scaler_type='minmax',
... return_dataframe=False,
... verbose=1
... )
>>> print(zbunch.frame.head())
year longitude latitude GWL rainfall_mm density_tier_... ...
2000 113.05 22.35 3.2 1200.0 ...
**2. Force generation of 100 000 dummy Nansha samples, skip encoding:**
>>> nbunch = load_subsidence_pinn_data(
... data_name='nansha',
... strategy='generate',
... n_samples=100000,
... encode_categoricals=False,
... scale_numericals=True,
... scaler_type='standard',
... save_cache=True,
... cache_suffix='_v1',
... verbose=2
... )
>>> print(nbunch.data.shape)
(100000, 5) # e.g., 5 numeric features
**3. Load Zhongshan data, but fallback to dummy if file missing, then
apply augmentation with yearly interpolation and Gaussian noise:**
>>> zbunch_aug = load_subsidence_pinn_data(
... data_name='zhongshan',
... strategy='fallback',
... use_cache=False,
... augment_data=True,
... augment_mode='both',
... interpolation_config={'freq':'YS','method':'linear'},
... feature_config={'noise_level':0.02,'noise_type':'gaussian'},
... verbose=1
... )
>>> print(zbunch_aug.frame.shape)
(e.g., 550000, 12) # Augmented rows added after interpolation
"""
[docs]
def fetch_zhongshan_data(
*,
n_samples: Optional[Union[int, str]] = None,
as_frame: bool = False,
include_coords: bool = True,
include_target: bool = True,
data_home: Optional[str] = None,
download_if_missing: bool = True,
force_download: bool = False,
random_state: Optional[int] = None,
verbose: bool = True
) -> Union[XBunch, pd.DataFrame]:
r"""Fetch the Zhongshan land subsidence dataset (sampled 2000 points).
Loads the `zhongshan_2000.csv` file, which contains features
related to land subsidence spatially sampled down to ~2000 points
from a larger dataset [Liu24]_. Includes coordinates, year,
hydrogeological factors, geological properties, risk scores, and
measured subsidence (target).
Optionally allows further sub-sampling using the `n_samples`
parameter via :func:`~fusionlab.utils.spatial_utils.spatial_sampling`.
Column details: 'longitude', 'latitude', 'year', 'GWL',
'seismic_risk_score', 'rainfall_mm', 'subsidence',
'geological_category', 'normalized_density', 'density_tier',
'subsidence_intensity', 'density_concentration',
'normalized_seismic_risk_score', 'rainfall_category'.
Parameters
----------
n_samples : int, str or None, default=None
Number of samples to load.
- If ``None`` or ``'*'``: Load the full sampled dataset (~2000 rows).
- If `int`: Sub-sample the specified number using spatial
stratification via
:func:`~fusionlab.utils.spatial_utils.spatial_sampling`.
Must be <= number of rows in the full file.
Requires `spatial_sampling` to be available.
as_frame : bool, default=False
Return type: ``False`` for Bunch object, ``True`` for DataFrame.
include_coords : bool, default=True
Include 'longitude' and 'latitude' columns.
include_target : bool, default=True
Include the 'subsidence' column.
data_home : str, optional
Path to cache directory. Defaults to ``~/fusionlab_data``.
download_if_missing : bool, default=True
Attempt download if file is not found locally.
force_download : bool, default=False
Force download attempt even if file exists locally.
random_state : int, optional
Seed for the random number generator used during sub-sampling
if `n_samples` is an integer. Ensures reproducibility.
verbose : bool, default=True
Print status messages during file fetching and sampling.
Returns
-------
data : :class:`~fusionlab.api.bunch.Bunch` or pandas.DataFrame
Loaded or sampled data. Bunch object includes `frame`, `data`,
`feature_names`, `target_names`, `target`, coords, and `DESCR`.
Raises
------
ValueError
If `n_samples` is invalid (e.g., non-integer, negative, or larger
than available rows when sampling).
FileNotFoundError, RuntimeError
If the dataset file cannot be found or downloaded.
OSError
If there is an error reading the dataset file.
References
----------
.. [Liu24] Liu, J., et al. (2024). Machine learning-based techniques...
*Journal of Environmental Management*, 352, 120078.
"""
from ..utils.spatial_utils import spatial_sampling
# --- Step 1: Obtain filepath using helper ---
filepath_to_load = download_file_if(
metadata=_ZHONGSHAN_METADATA, data_home=data_home,
download_if_missing=download_if_missing,
force_download=force_download, error='raise',
verbose=verbose
)
# --- Step 2: Load data ---
try:
df = pd.read_csv(filepath_to_load)
if verbose:
print(f"Successfully loaded full data ({len(df)} rows)"
f" from: {filepath_to_load}")
except Exception as e:
raise OSError(
f"Error reading dataset file at {filepath_to_load}: {e}"
) from e
# --- Step 3: Optional Sub-sampling ---
if n_samples is not None and n_samples != '*':
if not isinstance(n_samples, int) or n_samples <= 0:
raise ValueError(f"`n_samples` must be a positive integer"
f" or '*' or None. Got {n_samples}.")
total_rows = len(df)
if n_samples > total_rows:
warnings.warn(
f"Requested n_samples ({n_samples}) is larger than "
f"available rows ({total_rows}). Returning full dataset."
)
elif 'longitude' not in df.columns or 'latitude' not in df.columns:
warnings.warn(
"Coordinate columns ('longitude', 'latitude') not found. "
"Using simple random sampling instead of spatial sampling."
)
df = df.sample(n=n_samples, random_state=random_state)
if verbose:
print(f"Performed simple random sampling: {len(df)} rows.")
else:
# Use spatial sampling
if verbose:
print(f"Performing spatial sampling for {n_samples} rows...")
# Use verbose level 1 for spatial_sampling basic info
sample_verbose = 1 if verbose else 0
df = spatial_sampling(
df,
sample_size=n_samples,
spatial_cols=('longitude','latitude'),
random_state=random_state,
verbose=sample_verbose
)
if verbose:
print(f"Spatial sampling complete: {len(df)} rows selected.")
elif verbose:
print("Loading full dataset (n_samples is None or '*').")
# --- Step 4: Column Selection ---
coord_cols = ['longitude', 'latitude']
target_col = 'subsidence'
feature_cols = [
col for col in df.columns
if col not in coord_cols + [target_col]
]
cols_to_keep = []
if include_coords:
cols_to_keep.extend([c for c in coord_cols if c in df.columns])
cols_to_keep.extend(feature_cols)
if include_target:
if target_col in df.columns:
cols_to_keep.append(target_col)
else: warnings.warn(f"Target column '{target_col}' not found.")
final_cols = [c for c in cols_to_keep if c in df.columns]
df_subset = df[final_cols].copy()
# --- Step 5: Return DataFrame or Bunch ---
if as_frame:
df_subset.sort_values('year', inplace =True)
return df_subset
else:
# Assemble Bunch object (descriptions need updating)
target_names = ([target_col] if include_target and
target_col in df_subset else [])
target_array = df_subset[target_names].values.ravel(
) if target_names else None
bunch_feature_names = [
c for c in df_subset.columns
if c not in coord_cols + target_names
]
try:
data_array = df_subset[bunch_feature_names].select_dtypes(
include=np.number).values
except Exception:
data_array = None
warnings.warn("Could not extract numerical data for Bunch.data")
# Update description based on actual loaded/sampled size
descr = textwrap.dedent(f"""\
Zhongshan Land Subsidence Dataset (Raw Features)
**Origin:**
Spatially stratified sample from the dataset used in [Liu24]_,
focused on Zhongshan, China. Contains raw features potentially
influencing land subsidence. This function loads the pre-sampled
'zhongshan_2000.csv' file and optionally sub-samples it further.
**Data Characteristics (Loaded/Sampled):**
- Samples: {len(df_subset)}
- Total Columns Loaded: {len(df_subset.columns)}
- Feature Columns (in Bunch): {len(bunch_feature_names)}
- Target Column ('subsidence'): {'Present' if target_names else 'Not Loaded'}
**Available Columns in Frame:** {', '.join(df_subset.columns)}
""") # Removed full Bunch contents for brevity
bunch_dict = {
"frame": df_subset, "data": data_array,
"feature_names": bunch_feature_names,
"target_names": target_names, "target": target_array,
"DESCR": descr,
}
if include_coords:
if 'longitude' in df_subset:
bunch_dict['longitude'] = df_subset['longitude'].values
if 'latitude' in df_subset:
bunch_dict['latitude'] = df_subset['latitude'].values
return XBunch(**bunch_dict)
[docs]
def fetch_nansha_data(
*,
n_samples: Optional[Union[int, str]] = None,
as_frame: bool = False,
include_coords: bool = True,
include_target: bool = True,
data_home: Optional[str] = None,
download_if_missing: bool = True,
force_download: bool = False,
random_state: Optional[int] = None,
verbose: bool = True
) -> Union[XBunch, pd.DataFrame]:
r"""Fetch the sampled Nansha land subsidence dataset (2000 points).
Loads the `nansha_2000.csv` file, which contains features related
to land subsidence in Nansha, China, spatially sampled down to 2000
representative data points. It includes geographical coordinates,
temporal information (year), geological factors, hydrogeological
factors (GWL, rainfall), building concentration, risk scores, soil
thickness, and the measured land subsidence (target).
Optionally allows further sub-sampling using the `n_samples`
parameter via :func:`~fusionlab.utils.spatial_utils.spatial_sampling`.
Column details: 'longitude', 'latitude', 'year',
'building_concentration', 'geology', 'GWL', 'rainfall_mm',
'normalized_seismic_risk_score', 'soil_thickness', 'subsidence'.
The function searches for the data file (`nansha_2000.csv`)
using the logic in :func:`~fusionlab.datasets._property.download_file_if`
(Cache > Package > Download).
Parameters
----------
n_samples : int, str or None, default=None
Number of samples to load.
- If ``None`` or ``'*'``: Load the full sampled dataset (~2000 rows).
- If `int`: Sub-sample the specified number using spatial
stratification via
:func:`~fusionlab.utils.spatial_utils.spatial_sampling`.
Must be <= number of rows in the full file.
Requires `spatial_sampling` to be available.
as_frame : bool, default=False
Return type: ``False`` for Bunch object, ``True`` for DataFrame.
include_coords : bool, default=True
Include 'longitude' and 'latitude' columns.
include_target : bool, default=True
Include the 'subsidence' column.
data_home : str, optional
Path to cache directory. Defaults to ``~/fusionlab_data``.
download_if_missing : bool, default=True
Attempt download if file is not found locally.
force_download : bool, default=False
Force download attempt even if file exists locally.
random_state : int, optional
Seed for the random number generator used during sub-sampling.
verbose : bool, default=True
Print status messages during file fetching and sampling.
Returns
-------
data : :class:`~fusionlab.api.bunch.Bunch` or pandas.DataFrame
Loaded or sampled data. Bunch object includes `frame`, `data`,
`feature_names`, `target_names`, `target`, coords, and `DESCR`.
Raises
------
ValueError
If `n_samples` is invalid.
FileNotFoundError, RuntimeError
If the dataset file cannot be found or downloaded.
OSError
If there is an error reading the dataset file.
"""
from ..utils.spatial_utils import spatial_sampling
# --- Step 1: Obtain filepath using helper ---
filepath_to_load = download_file_if(
metadata=_NANSHA_METADATA, # Use Nansha metadata
data_home=data_home,
download_if_missing=download_if_missing,
force_download=force_download,
error='raise', # Raise error if not found/downloaded
verbose=verbose
)
# --- Step 2: Load data ---
try:
df = pd.read_csv(filepath_to_load)
if verbose:
print(f"Successfully loaded full data ({len(df)} rows)"
f" from: {filepath_to_load}")
except Exception as e:
raise OSError(
f"Error reading dataset file at {filepath_to_load}: {e}"
) from e
# --- Step 3: Optional Sub-sampling ---
if n_samples is not None and n_samples != '*':
if not isinstance(n_samples, int) or n_samples <= 0:
raise ValueError(f"`n_samples` must be a positive integer"
f" or '*' or None. Got {n_samples}.")
total_rows = len(df)
if n_samples > total_rows:
warnings.warn(
f"Requested n_samples ({n_samples}) is larger than "
f"available rows ({total_rows}). Returning full dataset."
)
elif 'longitude' not in df.columns or 'latitude' not in df.columns:
warnings.warn(
"Coordinate columns ('longitude', 'latitude') not found. "
"Using simple random sampling."
)
df = df.sample(n=n_samples, random_state=random_state)
if verbose:
print(f"Performed simple random sampling: {len(df)} rows.")
else:
# Use spatial sampling
if verbose:
print(f"Performing spatial sampling for {n_samples} rows...")
sample_verbose = 1 if verbose else 0
df = spatial_sampling(
df,
sample_size=n_samples,
spatial_cols=('longitude','latitude'),
random_state=random_state,
verbose=sample_verbose
)
if verbose:
print(f"Spatial sampling complete: {len(df)} rows selected.")
elif verbose:
print("Loading full dataset (n_samples is None or '*').")
# --- Step 4: Column Selection ---
coord_cols = ['longitude', 'latitude']
target_col = 'subsidence' # Assuming same target name
# Identify feature columns for Nansha data
feature_cols = [
col for col in df.columns
if col not in coord_cols + [target_col]
]
cols_to_keep = []
if include_coords:
cols_to_keep.extend([c for c in coord_cols if c in df.columns])
cols_to_keep.extend(feature_cols)
if include_target:
if target_col in df.columns:
cols_to_keep.append(target_col)
else: warnings.warn(f"Target column '{target_col}' not found.")
final_cols = [c for c in cols_to_keep if c in df.columns]
df_subset = df[final_cols].copy()
# --- Step 5: Return DataFrame or Bunch ---
if as_frame:
return df_subset
else:
# Assemble Bunch object
target_names = ([target_col] if include_target and
target_col in df_subset else [])
target_array = df_subset[target_names].values.ravel(
) if target_names else None
bunch_feature_names = [
c for c in df_subset.columns
if c not in coord_cols + target_names
]
try:
data_array = df_subset[bunch_feature_names].select_dtypes(
include=np.number).values
except Exception:
data_array = None
warnings.warn("Could not extract numerical data for Bunch.data")
# Create description for Nansha
descr = textwrap.dedent(f"""\
Sampled Nansha Land Subsidence Dataset (Raw Features)
**Origin:**
Spatially stratified sample (n={len(df_subset)}) focused on
Nansha, China. Contains raw features potentially influencing
land subsidence, including geological info, building concentration,
hydrogeology, etc.
**Data Characteristics (Loaded/Sampled):**
- Samples: {len(df_subset)}
- Total Columns Loaded: {len(df_subset.columns)}
- Feature Columns (in Bunch): {len(bunch_feature_names)}
- Target Column ('subsidence'): {'Present' if target_names else 'Not Loaded'}
**Available Columns in Frame:** {', '.join(df_subset.columns)}
""") # Simplified Bunch contents description
bunch_dict = {
"frame": df_subset, "data": data_array,
"feature_names": bunch_feature_names,
"target_names": target_names, "target": target_array,
"DESCR": descr,
}
if include_coords:
if 'longitude' in df_subset:
bunch_dict['longitude'] = df_subset['longitude'].values
if 'latitude' in df_subset:
bunch_dict['latitude'] = df_subset['latitude'].values
return XBunch(**bunch_dict)
[docs]
def load_processed_subsidence_data(
dataset_name: str = 'zhongshan',
*,
n_samples: Optional[Union[int, str]] = None,
as_frame: bool = False,
include_coords: bool = True,
include_target: bool = True,
data_home: Optional[str] = None,
download_if_missing: bool = True,
force_download_raw: bool = False,
random_state: Optional[int] = None,
apply_feature_select: bool = True,
apply_nan_ops: bool = True,
encode_categoricals: bool = True,
scale_numericals: bool = True,
scaler_type: str = 'minmax',
return_sequences: bool = False,
time_steps: int = 4,
forecast_horizon: int = 4,
target_col: str = 'subsidence',
scale_target: bool=False,
group_by_cols: bool =True,
use_processed_cache: bool = True,
use_sequence_cache: bool = True,
save_processed_frame: bool = False,
save_sequences: bool = False,
cache_suffix: str = "",
nan_handling_method: Optional[str] = 'fill',
verbose: bool = True
) -> Union[XBunch, pd.DataFrame, Tuple[np.ndarray, ...]]:
r"""Loads, preprocesses, and optionally sequences landslide datasets.
This function provides a complete pipeline to prepare the Zhongshan
or Nansha landslide datasets for use with forecasting models like
TFT/XTFT. It performs the following steps:
1. Loads the raw sampled data ('zhongshan_2000.csv' or
'nansha_2000.csv') using fetch functions (:func:`Workspace_zhongshan_data`
or :func:`Workspace_nansha_data`), optionally sub-sampling using
spatial stratification if `n_samples` is specified.
2. Optionally applies a predefined preprocessing sequence, mirroring
steps often used in research (e.g., based on [Liu24]_):
- Feature Selection (selecting a subset of columns).
- NaN Handling (e.g., filling missing values).
- Categorical Encoding (using One-Hot Encoding).
- Numerical Scaling (using MinMaxScaler or StandardScaler).
3. Optionally reshapes the fully processed data into sequences
suitable for TFT/XTFT models using
:func:`~fusionlab.utils.ts_utils.reshape_xtft_data`.
4. Optionally leverages caching by loading/saving the processed
DataFrame or the final sequence arrays to/from disk (`.joblib`)
to accelerate repeated executions with the same parameters.
Parameters
----------
dataset_name : {'zhongshan', 'nansha'}, default='zhongshan'
Which dataset to load and process ('zhongshan' or 'nansha').
n_samples : int, str, or None, default=None
Number of samples to load from the raw dataset file.
- If ``None`` or ``'*'``: Loads the full dataset (~2000 rows).
- If `int`: Sub-samples the specified number using spatial
stratification via
:func:`~fusionlab.utils.spatial_utils.spatial_sampling`.
Must be a positive integer less than or equal to the total
available samples.
as_frame : bool, default=False
Determines the return type *only if* ``return_sequences`` is ``False``.
- If ``False``: Returns a Bunch object containing the processed
DataFrame and metadata.
- If ``True``: Returns only the processed pandas DataFrame.
include_coords : bool, default=True
If ``True``, include 'longitude' and 'latitude' columns in the
output ``frame`` (and Bunch attributes).
include_target : bool, default=True
If ``True``, include the target column ('subsidence') in the
output ``frame`` (and Bunch attributes).
data_home : str, optional
Specify a directory path to cache raw datasets and processed
files. If ``None``, uses the path determined by
:func:`~fusionlab.datasets._property.get_data`
(typically ``~/fusionlab_data``). Default is ``None``.
download_if_missing : bool, default=True
If ``True``, attempt to download the raw dataset file from the
remote repository if it's not found locally.
force_download_raw : bool, default=False
If ``True``, forces download of the raw dataset file, ignoring
any local cache or packaged version.
random_state : int, optional
Seed for the random number generator used during sub-sampling
when ``n_samples`` is an integer. Ensures reproducibility.
apply_feature_select : bool, default=True
If ``True``, selects only the subset of features typically used
in reference examples for the specified `dataset_name`. If
``False``, attempts to use all columns found (after excluding
coords/target).
apply_nan_ops : bool, default=True
If ``True``, apply NaN handling using the internal :func:`nan_ops`
utility with the strategy specified by ``nan_handling_method``.
encode_categoricals : bool, default=True
If ``True``, apply Scikit-learn's OneHotEncoder to predefined
categorical columns ('geology', 'density_tier' for Zhongshan;
'geology' for Nansha). Adds new columns for encoded features
and removes the original categorical columns.
scale_numericals : bool, default=True
If ``True``, apply feature scaling to predefined numerical columns
(excluding coordinates, year, target, and encoded categoricals)
using the scaler specified by ``scaler_type``. Target column is
also scaled.
scaler_type : {'minmax', 'standard'}, default='minmax'
Type of scaler to use if `scale_numericals` is True.
return_sequences : bool, default=False
Controls the final output format.
- If ``True``: Performs sequence generation using
:func:`~fusionlab.utils.ts_utils.reshape_xtft_data` and
returns the sequence arrays.
- If ``False``: Skips sequence generation and returns the
processed DataFrame or Bunch object (controlled by `as_frame`).
time_steps : int, default=4
Lookback window size (number of past time steps) for sequence
generation. Only used if ``return_sequences=True``.
forecast_horizon : int, default=4
Prediction horizon (number of future steps) for sequence
generation. Only used if ``return_sequences=True``.
target_col : str, default='subsidence'
Name of the target variable column used for sequence generation.
scale_target: bool, default=False
Whether to scale the target or not.
group_by_cols : bool or list of str, default True
Controls how the data is partitioned before sequence generation.
- False (default): do not group by any columns; the entire dataset
is treated as a single continuous time series.
- list of str: names of one or more DataFrame columns (e.g.
['longitude', 'latitude']) to group by; each unique group will
produce its own set of sequences.
- None: equivalent to False (no grouping).
use_processed_cache : bool, default=True
If ``True`` and ``return_sequences=False``, attempts to load a
previously saved processed DataFrame (and scaler/encoder info)
from the cache directory before running the preprocessing steps.
use_sequence_cache : bool, default=True
If ``True`` and ``return_sequences=True``, attempts to load
previously saved sequence arrays from the cache directory before
running preprocessing and sequence generation.
save_processed_frame : bool, default=False
If ``True`` and preprocessing is performed (cache miss or
``use_processed_cache=False``), saves the resulting processed
DataFrame, scaler info, and encoder info to a joblib file in
the cache directory. Ignored if ``return_sequences=True``.
save_sequences : bool, default=False
If ``True`` and sequence generation is performed (cache miss or
``use_sequence_cache=False``), saves the resulting sequence
arrays (`static_data`, `dynamic_data`, `future_data`,
`target_data`) to a joblib file in the cache directory. Only used
if ``return_sequences=True``.
cache_suffix : str, default=""
Optional suffix appended to cache filenames (before '.joblib')
to allow caching results from different processing variations
(e.g., different `n_samples` or preprocessing flags).
nan_handling_method : str, default='fill'
Method used by :func:`nan_ops` if ``apply_nan_ops=True``.
Typically 'fill' (forward fill then backward fill).
verbose : bool, default=True
If ``True``, print status messages during file fetching,
processing, caching, and sequence generation.
Returns
-------
Processed Data : Union[Bunch, pd.DataFrame, Tuple[np.ndarray, ...]]
The type depends on `return_sequences` and `as_frame`:
- If `return_sequences=True`: Returns a tuple containing the
sequence arrays required by TFT/XTFT:
``(static_data, dynamic_data, future_data, target_data)``
- If `return_sequences=False` and `as_frame=True`: Returns the
fully processed pandas DataFrame (after selection, NaN handling,
encoding, scaling).
- If `return_sequences=False` and `as_frame=False`: Returns a
:class:`~fusionlab.api.bunch.Bunch` object containing the
processed DataFrame (`frame`), extracted numerical features
(`data`), feature names (`feature_names`), target info
(`target_names`, `target`), coordinates (`longitude`,
`latitude`), and a description (`DESCR`).
Raises
------
ValueError
If `dataset_name` is invalid, `n_samples` is invalid, or required
columns are missing for selected processing steps.
FileNotFoundError, RuntimeError, OSError
If underlying raw data loading fails (fetching from cache,
package, or download).
References
----------
.. [Liu24] Liu, J., et al. (2024). Machine learning-based techniques...
*Journal of Environmental Management*, 352, 120078.
Examples
--------
>>> from fusionlab.datasets import load_processed_subsidence_data
>>> # Load processed Zhongshan data as a Bunch object
>>> data_bunch = load_processed_subsidence_data(dataset_name='zhongshan',
... as_frame=False,
... return_sequences=False)
>>> print(data_bunch.frame.head())
>>> print(data_bunch.feature_names)
>>> # Load Nansha data, preprocess, and return sequences
>>> static, dynamic, future, target = load_processed_subsidence_data(
... dataset_name='nansha',
... return_sequences=True,
... time_steps=6,
... forecast_horizons=3,
... scale_numericals=True,
... scaler_type='standard',
... verbose=False
... )
>>> print(f"Nansha sequences shapes: S={static.shape}, D={dynamic.shape},"
... f" F={future.shape}, y={target.shape}")
>>> # Load a small sample and save processed frame
>>> df_proc_sample = load_processed_subsidence_data(
... dataset_name='zhongshan',
... n_samples=100,
... random_state=42,
... as_frame=True,
... return_sequences=False,
... save_processed_frame=True,
... cache_suffix="_sample100"
... )
>>> print(f"Loaded and processed sample shape: {df_proc_sample.shape}")
"""
from ..utils.io_utils import fetch_joblib_data
from ..nn.utils import reshape_xtft_data
from ..utils.data_utils import nan_ops
# --- Configuration based on dataset name ---
if dataset_name == 'zhongshan':
fetch_func = fetch_zhongshan_data
default_features = [ # Features used in paper example
'longitude', 'latitude', 'year', 'GWL', 'rainfall_mm',
'geology', 'normalized_density', 'density_tier',
'normalized_seismic_risk_score', 'subsidence'
]
categorical_cols = ['geology', 'density_tier']
# Numerical cols excluding coords, year, target, categoricals
numerical_cols = [
'GWL', 'rainfall_mm', 'normalized_density',
'normalized_seismic_risk_score'
]
spatial_cols = ['longitude', 'latitude']
dt_col = 'year' # Time column for reshaping
elif dataset_name == 'nansha':
fetch_func = fetch_nansha_data
default_features = [ # Features listed for Nansha
'longitude', 'latitude', 'year', 'building_concentration',
'geology', 'GWL', 'rainfall_mm',
'normalized_seismic_risk_score', 'soil_thickness', 'subsidence'
]
categorical_cols = ['geology', 'building_concentration'] # Example, adjust as needed
numerical_cols = [
'GWL', 'rainfall_mm',
'normalized_seismic_risk_score', 'soil_thickness'
]
spatial_cols = ['longitude', 'latitude']
dt_col = 'year'
else:
raise ValueError(f"Unknown dataset_name: '{dataset_name}'."
" Choose 'zhongshan' or 'nansha'.")
# --- Define Cache Filenames ---
# ... (Keep cache filename logic) ...
data_dir = get_data(data_home)
processed_fname = f"{dataset_name}_processed{cache_suffix}.joblib"
processed_fpath = os.path.join(data_dir, processed_fname)
seq_fname = (f"{dataset_name}_sequences_T{time_steps}_H{forecast_horizon}"
f"{cache_suffix}.joblib")
seq_fpath = os.path.join(data_dir, seq_fname)
# --- Try Loading Cached Sequences ---
if return_sequences and use_sequence_cache:
try:
sequences_data = fetch_joblib_data(
seq_fpath, 'static_data', 'dynamic_data',
'future_data', 'target_data', verbose=verbose > 1,
error_mode='raise'
)
if verbose: print(f"Loaded cached sequences from: {seq_fpath}")
return sequences_data
except FileNotFoundError:
if verbose > 0:
print(f"Sequence cache not found: {seq_fpath}")
except Exception as e:
warnings.warn(
f"Error loading cached sequences: {e}. Reprocessing.")
# --- Try Loading Cached Processed DataFrame ---
df_processed = None
# Initialize scaler/encoder info to be loaded from cache or created
scaler_info = {'columns': [], 'scaler': None}
encoder_info = {'columns': {}, 'names': {}}
if use_processed_cache:
try:
cached_proc = fetch_joblib_data(
processed_fpath, 'data', 'scaler_info', 'encoder_info',
verbose=verbose > 1, error_mode='ignore'
)
if cached_proc and isinstance(cached_proc, dict):
df_processed = cached_proc.get('data')
scaler_info = cached_proc.get('scaler_info', scaler_info)
encoder_info = cached_proc.get('encoder_info', encoder_info)
if df_processed is not None and verbose:
print(
f"Loaded cached processed DataFrame: {processed_fpath}")
elif verbose > 0:
print(
f"Processed cache invalid/not found: {processed_fpath}")
except FileNotFoundError:
if verbose > 0:
print(
f"Processed cache not found: {processed_fpath}")
except Exception as e:
warnings.warn(
f"Error loading cached processed data: {e}. Reprocessing.")
df_processed = None
# --- Perform Processing if Cache Miss ---
if df_processed is None:
if verbose: print("Processing data from raw file...")
# 1. Load Raw Data
df_raw = fetch_func(
as_frame=True,
n_samples=n_samples,
data_home=data_home,
download_if_missing=download_if_missing,
force_download=force_download_raw,
random_state=random_state,
verbose=verbose > 1,
include_coords=True,
include_target=True
)
df_processed = df_raw.copy()
df_processed.sort_values('year', inplace=True)
# 2. Feature Selection
if apply_feature_select:
# Ensure all required features for the steps exist
required_cols = default_features + [target_col]
missing_fs = [f for f in required_cols if f not in df_processed.columns]
if missing_fs:
raise ValueError(f"Required features missing from raw data:"
f" {missing_fs}")
# Select only the columns needed for this workflow
df_processed = df_processed[default_features].copy()
if verbose:
print(f" Applied feature selection. Kept: {default_features}")
# 3. NaN Handling
if apply_nan_ops:
original_len = len(df_processed)
df_processed = nan_ops(
df_processed, ops='sanitize',
action=nan_handling_method,
process="do_anyway",
verbose=verbose > 1
)
if verbose:
print(f" Applied NaN handling ('{nan_handling_method}')."
f" Rows removed: {original_len - len(df_processed)}")
# 4. Categorical Encoding
encoder_info = {'columns': {}, 'names': {}} # Reset encoder info
if encode_categoricals:
if verbose: print(" Encoding categorical features...")
# Keep non-categorical columns
cols_to_keep_temp = df_processed.columns.difference(
categorical_cols).tolist()
df_encoded_list = [df_processed[cols_to_keep_temp]]
for col in categorical_cols:
if col in df_processed.columns:
encoder = OneHotEncoder(
sparse_output=False,
handle_unknown='ignore',
dtype=np.float32
) # Ensure float output
encoded_data = encoder.fit_transform(df_processed[[col]])
# Use categories_ for naming to handle unseen values if needed
new_cols = [f"{col}_{cat}" for cat in encoder.categories_[0]]
encoded_df = pd.DataFrame(encoded_data,
columns=new_cols,
index=df_processed.index)
df_encoded_list.append(encoded_df)
encoder_info['columns'][col] = new_cols
encoder_info['names'][col] = encoder.categories_[0]
if verbose > 1: print(f" Encoded '{col}' -> {len(new_cols)} cols")
else:
warnings.warn(f"Categorical column '{col}' not found.")
# Combine original non-categorical with new encoded columns
df_processed = pd.concat(df_encoded_list, axis=1)
else:
if verbose:
print(" Skipped categorical encoding.")
# 5. Numerical Scaling
scaler_info = {'columns': [], 'scaler': None} # Reset scaler info
if scale_numericals:
# Identify numerical columns to scale from the *current* dataframe
# Exclude coordinates, target (scaled separately or not at all),
# and already encoded categoricals.
current_num_cols = df_processed.select_dtypes(include=np.number).columns
encoded_flat_list = [
item for sublist in encoder_info['columns'].values()
for item in sublist
]
cols_to_scale = list(
set(numerical_cols) & set(current_num_cols) - set(
encoded_flat_list))
# Also scale the target column if present
if scale_target:
if target_col in df_processed.columns and target_col not in cols_to_scale:
cols_to_scale.append(target_col)
else:
# for consisteny drop target if exist in cols_to_scale
if target_col in cols_to_scale:
try:
cols_to_scale.remove(target_col)
except:
cols_to_scale = [
col for col in cols_to_scale if col !=target_col
]
if cols_to_scale:
if verbose:
print(f" Scaling numerical features: {cols_to_scale}...")
if scaler_type == 'minmax':
scaler = MinMaxScaler()
elif scaler_type == 'standard':
scaler = StandardScaler()
else:
raise ValueError(f"Unknown scaler_type: {scaler_type}")
df_processed[cols_to_scale] = scaler.fit_transform(
df_processed[cols_to_scale])
scaler_info['columns'] = cols_to_scale
scaler_info['scaler'] = scaler # Store fitted scaler
else:
if verbose: print(" No numerical columns found/left to scale.")
else:
if verbose: print(" Skipped numerical scaling.")
# 6. Save Processed DataFrame if requested
if save_processed_frame:
# Include data and potentially scalers/encoders
save_data = {
'data': df_processed,
'scaler_info': scaler_info,
'encoder_info': encoder_info
}
try:
joblib.dump(save_data, processed_fpath)
if verbose: print(f"Saved processed DataFrame to: {processed_fpath}")
except Exception as e:
warnings.warn(f"Failed to save processed data: {e}")
# --- Return Processed DataFrame or Bunch if Sequences Not Requested ---
if not return_sequences:
if as_frame:
return df_processed
else:
# Create Bunch from processed data
# ... (Keep Bunch creation logic as before, using df_processed) ...
target_names = ([target_col] if include_target and
target_col in df_processed else [])
target_array = df_processed[target_names].values.ravel(
) if target_names else None
bunch_feature_names = [
c for c in df_processed.columns
if c not in spatial_cols + target_names # Use spatial_cols
]
try:
data_array = df_processed[bunch_feature_names].select_dtypes(
include=np.number).values
except Exception: data_array = None
descr = textwrap.dedent(f"""\
Processed {dataset_name.capitalize()} Landslide Dataset
(Processing based on [Liu24] Zhongshan Example)
**Origin:** See fetch_{dataset_name}_data docstring.
**Processing Applied:** Feature Selection={apply_feature_select},
NaN Handling='{nan_handling_method if apply_nan_ops else 'None'}',
Categorical Encoding={'OneHot' if encode_categoricals else 'None'},
Numerical Scaling={'None' if not scale_numericals else scaler_type}.
**Data Characteristics:**
- Samples: {len(df_processed)}
- Columns: {len(df_processed.columns)}
""")
bunch_dict = {
"frame": df_processed, "data": data_array,
"feature_names": bunch_feature_names,
"target_names": target_names, "target": target_array,
"DESCR": descr,
}
# Add coords only if requested AND present
if include_coords:
if 'longitude' in df_processed:
bunch_dict['longitude'] = df_processed['longitude'].values
if 'latitude' in df_processed:
bunch_dict['latitude'] = df_processed['latitude'].values
return XBunch(**bunch_dict)
# --- Generate Sequences if Requested ---
if verbose:
print("\nReshaping processed data into sequences...")
# === Step 6 Revision: Define final feature sets for reshape_xtft_data ===
# Get list of one-hot encoded columns generated previously
encoded_cat_cols = []
if encode_categoricals and encoder_info.get('columns'):
for cols in encoder_info['columns'].values():
encoded_cat_cols.extend(cols)
# Define Static Features for the model sequences
# Paper example used: coords + encoded geology + encoded density_tier
final_static_cols = list(spatial_cols) # Start with coordinates
if dataset_name == 'zhongshan':
# Add encoded columns if they exist in df_processed
final_static_cols.extend([c for c in encoded_cat_cols if c.startswith(
'geology_') or c.startswith('density_tier_')])
elif dataset_name == 'nansha':
# Add encoded columns for nansha if applicable
final_static_cols.extend([c for c in encoded_cat_cols if c.startswith(
'geology_') or c.startswith('building_concentration_')])
# Ensure no duplicates and columns exist
final_static_cols = sorted(list(set(
c for c in final_static_cols if c in df_processed.columns)))
if verbose > 1: print(f" Final Static Cols: {final_static_cols}")
# Define Dynamic Features for the model sequences
# Paper example used: 'GWL', 'rainfall_mm', 'normalized_seismic_risk_score',
# 'normalized_density'
# These should correspond to columns in numerical_cols (already scaled)
final_dynamic_cols = sorted(list(set(
c for c in numerical_cols if c in df_processed.columns and c != target_col
)))
if verbose > 1:
print(f" Final Dynamic Cols: {final_dynamic_cols}")
# Define Future Features for the model sequences
# Paper example used: 'rainfall_mm'
final_future_cols = ['rainfall_mm'] # As per paper example
# Ensure it exists
final_future_cols = [c for c in final_future_cols if c in df_processed.columns]
if not final_future_cols and dataset_name=='zhongshan':
# Check if required feature missing
warnings.warn("'rainfall_mm' required for future features based on"
" example, but not found in processed data.")
if verbose > 1:
print(f" Final Future Cols: {final_future_cols}")
# --- End Step 6 Revision ---
# Check if required columns exist before calling reshape
required_for_reshape = (
[dt_col, target_col] + final_static_cols + final_dynamic_cols
+ final_future_cols + spatial_cols
)
# Remove potential duplicates before checking
required_unique = sorted(list(set(required_for_reshape)))
missing_in_processed = [
c for c in required_unique if c not in df_processed.columns
]
if missing_in_processed:
raise ValueError(f"Columns missing for reshape_xtft_data after"
f" processing: {missing_in_processed}. Available:"
f" {df_processed.columns.tolist()}")
# Call reshape_xtft_data on the fully processed DataFrame
# Call reshape_xtft_data on the fully processed DataFrame
try:
static_data, dynamic_data, future_data, target_data = reshape_xtft_data(
df=df_processed,
dt_col=dt_col, # Use 'year' as the time index column
target_col=target_col,
static_cols=final_static_cols,
dynamic_cols=final_dynamic_cols,
future_cols=final_future_cols,
spatial_cols=None if not group_by_cols else spatial_cols,
time_steps=time_steps,
forecast_horizon=forecast_horizon,
verbose=verbose > 0
)
except ValueError as ve:
required_len = time_steps + forecast_horizon
msg = (
f"{ve}\n\n"
f"It looks like you're using the built-in {dataset_name} dataset (≈2000 samples),\n"
f"but no (lon, lat) group has at least {required_len} time points.\n"
"You have a few options:\n"
" 1. Treat the entire dataset as one long sequence by passing\n"
" `group_by_cols=None`.\n"
" 2. Reduce `time_steps` or `forecast_horizon` so that each location\n"
f" needs fewer than {required_len} data points.\n"
" 3. Augment your city spatio-temporal data (e.g.\n"
" via `fusionlab.utils.geo_utils.augment_city_spatiotemporal_data`)\n"
" to create more samples per location.\n"
)
raise ValueError(msg) from None
# Save sequences if requested
if save_sequences:
sequence_data_to_save = {
'static_data': static_data, 'dynamic_data': dynamic_data,
'future_data': future_data, 'target_data': target_data
}
try:
joblib.dump(sequence_data_to_save, seq_fpath)
if verbose: print(f"Saved sequences to: {seq_fpath}")
except Exception as e:
warnings.warn(f"Failed to save sequences: {e}")
return static_data, dynamic_data, future_data, target_data
def _resolve_data_path(
data_home: Optional[str],
metadata: "RemoteMetadata",
data_name: str,
strategy: str,
verbose: int = 0
) -> Tuple[str, "RemoteMetadata"]:
"""Validates data path and resolves the final path to a raw data file.
This internal helper encapsulates the complex logic for finding the
correct dataset file. It provides a robust and flexible mechanism
that can handle a user-provided path as a direct file, a directory
to search, or fall back to the default package cache and download
mechanisms.
Parameters
----------
data_home : str or None
The user-provided path for locating the data. It can be:
- A direct path to a specific file (e.g., './data/my_data.csv').
- A path to a directory where the data file is located.
- ``None``, in which case the default fusionlab cache directory
is used.
metadata : RemoteMetadata
The default metadata object for the dataset, containing the
expected filename (``metadata.file``) and download URL
(``metadata.url``).
data_name : str
The name of the dataset (e.g., 'zhongshan'), used primarily for
logging and creating unique cache filenames.
strategy : {'load', 'fallback', 'generate'}
The loading strategy, which informs the error handling behavior
if a file cannot be found or downloaded.
verbose : int, default=0
The verbosity level for logging messages during the resolution
process.
Returns
-------
tuple of (str, RemoteMetadata)
A tuple containing:
- ``data_path_resolved``: The final, absolute path to the data
file that should be loaded.
- ``updated_metadata``: The metadata object, which may have been
updated with a new filename if a non-default file was used.
Raises
------
FileNotFoundError
If `data_home` points to an invalid directory or if a required
file cannot be found or downloaded when ``strategy='load'``.
ValueError
If `data_home` is a directory containing multiple potential data
files, creating an ambiguous situation.
Notes
-----
The function follows a strict resolution order to find the data:
1. If ``data_home`` is a direct path to a file, it is used, and the
default metadata is updated accordingly.
2. If ``data_home`` is a directory, the function first searches for
the default filename specified in ``metadata.file``.
3. If the default file is not found, it then scans the directory
for a **single, unique** alternative ``.csv`` or ``.xlsx`` file.
4. If no local file is found through the steps above, it constructs
the default path in the cache and attempts to download the file
from ``metadata.url`` if ``download_if_missing`` is enabled
in the calling context.
See Also
--------
load_subsidence_pinn_data : The main user-facing function that uses this helper.
fusionlab.datasets._property.get_data : Utility to get the default cache path.
fusionlab.datasets._property.download_file_if : Utility to handle file downloads.
Examples
--------
>>> # Assume metadata.file = 'default_data.csv'
>>> # Case 1: data_home points directly to a file
>>> # path, meta = _resolve_data_path(
... # './my_data/custom.csv', metadata, ...
... # )
>>> # path would be '.../my_data/custom.csv'
>>> # meta.file would be 'custom.csv'
>>> # Case 2: data_home is a directory containing 'default_data.csv'
>>> # path, meta = _resolve_data_path(
... # './my_data/', metadata, ...
... # )
>>> # path would be '.../my_data/default_data.csv'
>>> # meta.file would remain 'default_data.csv'
>>> # Case 3: data_home is a directory with one other unique csv file
>>> # (and 'default_data.csv' is absent)
>>> # path, meta = _resolve_data_path(
... # './my_data_other/', metadata, ...
... # )
>>> # path would be '.../my_data_other/unique_file.csv'
>>> # meta.file would become 'unique_file.csv'
"""
from ..utils.generic_utils import ExistenceChecker
if data_home is None:
# Case 1: No data_home provided. Use default fusionlab cache.
resolved_home = get_data()
data_path_resolved = os.path.join(resolved_home, metadata.file)
if verbose >= 1:
logger.info(f"No `data_home` provided. Using default path:"
f" {data_path_resolved}")
else:
# Case 2: A data_home path is provided.
path_obj = Path(data_home)
if path_obj.is_file():
# Subcase 2a: data_home is a direct path to a file.
if verbose >= 1:
logger.info(f"Provided `data_home` is a file. Using it directly:"
f" {data_home}")
data_path_resolved = str(path_obj.resolve())
# Update metadata to reflect the user-provided file.
metadata = metadata._replace(
file=path_obj.name,
url=None # Local file has no download URL
)
return data_path_resolved, metadata
else:
# Subcase 2b: data_home is a directory. Validate and search it.
try:
resolved_home = str(ExistenceChecker.ensure_directory(path_obj))
except (TypeError, FileExistsError, OSError) as e:
raise FileNotFoundError(
f"The provided `data_home` directory '{data_home}' is "
f"invalid. Please check the path. Original error: {e}"
)
default_file_path = os.path.join(resolved_home, metadata.file)
if os.path.exists(default_file_path):
data_path_resolved = default_file_path
if verbose >= 1:
logger.info(f"Found default dataset '{metadata.file}'"
f" in `data_home`.")
else:
# Default file not found, search for other data files.
if verbose >= 2:
logger.debug(
f"Default file '{metadata.file}' not found in"
f" '{resolved_home}'. Searching for other data files..."
)
other_files = [
f for f in os.listdir(resolved_home)
if f.endswith(('.csv', '.xlsx')) and not f.startswith('~')
]
if len(other_files) == 1:
# Exactly one other file found, use it.
new_filename = other_files[0]
data_path_resolved = os.path.join(resolved_home, new_filename)
metadata = metadata._replace(file=new_filename, url=None)
if verbose >= 1:
logger.info(
f"Using auto-detected data file: {new_filename}")
elif len(other_files) > 1:
# Ambiguous situation.
raise ValueError(
f"Default data file '{metadata.file}' not found in "
f"'{resolved_home}', but multiple other data files were "
f"found: {other_files}. Ambiguous which one to use. "
"Please specify the full file path in `data_home`."
)
else:
# No files found, will proceed to download logic.
data_path_resolved = default_file_path
if verbose >= 2:
logger.debug(
"No other data files found. Will attempt"
" download if possible."
)
# --- Download Logic ---
# This block runs if the resolved_path points to a file that doesn't exist.
try:
if metadata.url and not os.path.exists(data_path_resolved):
if verbose >= 1:
logger.info(
f"Data not found locally. Attempting to download {data_name} "
f"data from {metadata.url} to {data_path_resolved}..."
)
download_file_if( # This function should handle the download
metadata,
data_home=os.path.dirname(data_path_resolved)
)
except Exception as e:
if strategy == "load":
raise FileNotFoundError(
f"Failed to download or find required data file "
f"'{metadata.file}' for {data_name}: {e}"
) from e
logger.warning(
f"Could not download data for {data_name}: {e}. "
"Will proceed based on strategy."
)
return data_path_resolved, metadata