Source code for fusionlab.utils.spatial_utils

# -*- coding: utf-8 -*-
#   License: BSD-3-Clause
#   Author: LKouadio <etanoyau@gmail.com>
"""
geospatial_utils - A collection of utilities for geospatial and positional 
data analysis, filtering, and transformations.
"""
from __future__ import print_function, annotations 

import warnings
from numbers import Real
from scipy.spatial import cKDTree

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.patches import Patch, Circle
import seaborn as sns

from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
 
from ..api.types import (
    DataFrame, 
    Optional,
    Tuple, 
    Union, 
    List,
)
from ..compat.sklearn import validate_params, StrOptions, Interval 
from ..core.array_manager import extract_array_from 
from ..core.checks import ( 
    exist_features, 
    assert_ratio, 
    are_all_frames_valid, 
    check_spatial_columns
    )
from ..core.handlers import columns_manager, resolve_label  
from ..core.io import SaveFile, is_data_readable 
# from ..core.plot_manager import set_axis_grid  
from ..decorators import Deprecated, isdf 
from .generic_utils import find_id_column 
from .validator import ( 
    validate_positive_integer, 
    validate_length_range , 
    filter_valid_kwargs, 
    parameter_validator
    )

HAS_TQDM=True 
try: 
    from tqdm import tqdm 
except: 
    HAS_TQDM = False 
    
__all__ = [
     'spatial_sampling', 
     'extract_coordinates', 
     'batch_spatial_sampling', 
     'extract_zones_from', 
     'filter_position', 
     'create_spatial_clusters', 
     'gen_negative_samples', 
     'gen_buffered_negative_samples', 
     'gen_negative_samples_plus', 
     
 ]

@SaveFile 
@isdf  
def gen_negative_samples_plus(
    df: pd.DataFrame,
    target_col: str,
    spatial_cols: Tuple[str, str] = (
        'longitude',
        'latitude'
    ),
    feature_cols: Optional[List[str]] = None,
    buffer_km: float = 10,
    neg_feature_range: Tuple[float, float] = (
        0, 5
    ),
    num_neg_per_pos: int = 1,
    strategy: str = 'landslide', 
    gauge_data: Optional[pd.DataFrame] = None,
    elevation_data: Optional[pd.DataFrame] = None,
    similarity_features: Optional[
        List[str]
    ] = None,
    time_col: Optional[str] = None,
    cluster_method: str = 'kmeans',  
    use_gpd: Union[bool, str] = 'auto',
    id_col='auto',
    view: bool = False,
    savefile: Optional[str] = None,
    verbose: int = 1,
    seed: Optional[int] = None
) -> pd.DataFrame:
    """
    Generates negative samples for modeling in spatial scenarios,
    offering multiple strategies. The function calls
    `gen_buffered_negative_samples` when the ``strategy`` argument
    is ``'landslide'``, ``'event'``, or ``'gauge'``. It also calls
    `generate_negative_samples` partially in the ``'hybrid'``
    strategy. Internally, each sample is augmented to produce
    negative instances according to a chosen method.
    
    .. math::
       \\text{buffer_deg} = \\frac{\\text{buffer_km}}{111.0}
    
    The above formula approximates degrees from kilometers near
    the equator. The output is a combined dataset containing
    original positives and generated negatives. The ratio of
    negatives per positive is controlled by
    ``num_neg_per_pos``.
    
    Parameters
    ----------
    df : pandas.DataFrame
        Input data containing spatial coordinates and features.
    target_col : str
        Name of the classification target column. Positive
        samples in `df` are labeled, negatives will be generated.
    spatial_cols : tuple of str, optional
        Columns representing longitude and latitude in `df`.
    feature_cols : list of str, optional
        Additional feature columns used to drive or constrain
        negative sampling processes.
    buffer_km : float, optional
        Radius in kilometers for local negative sampling. Used
        to compute buffer degrees.
    neg_feature_range : tuple of float, optional
        Lower and upper range for continuous features in random
        negative generation.
    num_neg_per_pos : int, optional
        Number of negatives to generate per positive instance.
    strategy : str, optional
        Defines the sampling approach. Options include
        ``'landslide'``, ``'gauge'``, ``'random_global'``,
        ``'temporal_shift'``, ``'clustered_negatives'``,
        ``'environmental_similarity'``, ``'elevation_based'``,
        and ``'hybrid'``.
        
        See more in :ref:`User Guide <user_guide>`.
        
    gauge_data : pandas.DataFrame, optional
        Reference data for gauge-based or hybrid strategies.
    elevation_data : pandas.DataFrame, optional
        Elevation records, used if ``strategy='elevation_based'``.
    similarity_features : list of str, optional
        Columns for nearest neighbor computation in
        ``'environmental_similarity'``.
    time_col : str, optional
        Name of the time column for ``'temporal_shift'``. Required
        if using that strategy.
    cluster_method : str, optional
        Clustering algorithm for ``'clustered_negatives'``. Default
        is ``'kmeans'``.
    use_gpd : bool or str, optional
        Indicator for whether geopandas is used in buffer-based
        processes.
    id_col : str, optional
        Column name used as an identifier. If `'auto'`, a default is
        used.
    view : bool, optional
        Flag for visualizing or previewing the results.
    savefile : str, optional
        Path to save the output dataset. If None, no file is saved.
    verbose : int, optional
        Verbosity level. Higher values yield more logs.
    seed : int, optional
        Random seed for reproducibility. If None, randomness is not
        fixed.
    
    Returns
    -------
    pandas.DataFrame
        A combined DataFrame containing the original positive samples
        labeled as 1 and newly generated negative samples labeled 0.
    
    Notes
    -----
    When ``strategy='hybrid'``, partial sets of negatives come from
    two distinct calls to `generate_negative_samples` for
    ``'landslide'`` and ``'gauge'`` sub-strategies, then merged.
    
    Examples
    --------
    >>> from fusionlab.utils.spatial_utils import gen_negative_samples_plus
    >>> import pandas as pd
    >>> df_example = pd.DataFrame({{
    ...     "longitude": [10.1, 10.2, 10.3],
    ...     "latitude":  [45.1, 45.2, 45.3],
    ...     "feature":   [3.4, 2.1, 6.7],
    ...     "target":    [1, 1, 1]
    ... }})
    >>> gauge_data = pd.DataFrame({{
    ...     'gauge_id': ['G1', 'G2', 'G3'],
    ...     'latitude': np.random.uniform(24.0, 25.0, 3),
    ...     'longitude': np.random.uniform(113.0, 114.0, 3)
    ... }})
    >>> # Generate random global negatives
    >>> result = gen_negative_samples_plus(
    ...     df_example,
    ...     target_col="target",
    ...     strategy="random_global"
    ... )
    >>> print(result.head())
    
    See Also
    --------
    gen_buffered_negative_samples : Generates negative samples
        within a buffer region around reference events or gauges.
    generate_negative_samples : A simpler negative sampling
        utility for certain strategies.
    
    References
    ----------
    .. [1] P. Goovaerts, "Geostatistics for Natural Resources
       Evaluation," Oxford University Press, 1997.
    """

    # If the strategy is one of the recognized
    # types (landslide, event, gauge), we
    # directly call gen_buffered_negative_samples.
    if str(strategy).lower() in (
        'landslide',
        'event',
        'gauge'
    ):
        return gen_buffered_negative_samples(
            df=df,
            target_col=target_col,
            spatial_cols=spatial_cols,
            feature_cols=feature_cols,
            buffer_km=buffer_km,
            neg_feature_range=neg_feature_range,
            num_neg_per_pos=num_neg_per_pos,
            strategy=strategy,
            gauge_data=gauge_data,
            use_gpd=use_gpd,
            view=view,
            id_col=id_col,
            savefile=savefile,
            seed=seed, 
            verbose=verbose
        )

    # If not in the above categories, we proceed
    # with advanced strategies.
    np.random.seed(seed)

    # Validate columns and parameters via the
    # _validate_negative_sampling utility.
    checked_values = _validate_negative_sampling(
        df=df,
        target_col=target_col,
        spatial_cols=spatial_cols,
        feature_cols=feature_cols,
        neg_feature_range=neg_feature_range,
        num_neg_per_pos=num_neg_per_pos,
        verbose=verbose,
        id_col=id_col
    )
    (
        spatial_cols,
        feature_cols,
        neg_feature_range,
        num_neg_per_pos
    ) = checked_values

    # Extract column names for lon/lat.
    lon_col, lat_col = spatial_cols

    # Convert buffer kilometers to degrees,
    # approximating 1 deg ~ 111 km.
    buffer_deg = buffer_km / 111.0

    # Additional advanced strategy options.
    add_strategies = [
        'random_global',
        'temporal_shift',
        'clustered_negatives',
        'environmental_similarity',
        'elevation_based',
        'hybrid'
    ]

    # Validate the selected strategy from
    # the advanced list.
    strategy = parameter_validator(
        'strategy',
        target_strs=add_strategies,
        deep=True
    )(strategy)

    # Collect negative samples in a list.
    negatives = []

    # -----------------------
    # RANDOM_GLOBAL strategy:
    # Generate uniform random coordinates
    # across the bounding box of df.
    # -----------------------
    if strategy == 'random_global':
        lon_min, lon_max = (
            df[lon_col].min(),
            df[lon_col].max()
        )
        lat_min, lat_max = (
            df[lat_col].min(),
            df[lat_col].max()
        )
        for _ in range(len(df) * num_neg_per_pos):
            sample = {
                lon_col: np.random.uniform(
                    lon_min,
                    lon_max
                ),
                lat_col: np.random.uniform(
                    lat_min,
                    lat_max
                ),
                target_col: 0
            }
            for col in feature_cols:
                sample[col] = np.random.uniform(
                    *neg_feature_range
                )
            negatives.append(sample)

    # -----------------------
    # TEMPORAL_SHIFT strategy:
    # Shift time backward (e.g. 30 days)
    # and mark as negative samples.
    # -----------------------
    elif strategy == 'temporal_shift':
        if time_col is None:
            raise ValueError(
                "Time column must be provided "
                "for 'temporal_shift'."
            )
        df_shifted = df.copy()
        df_shifted[time_col] = pd.to_datetime(
            df_shifted[time_col]
        ) - pd.Timedelta(days=30)
        for _, row in df_shifted.iterrows():
            sample = row.to_dict()
            sample[target_col] = 0
            negatives.append(sample)

    # ---------------------------
    # CLUSTERED_NEGATIVES:
    # Cluster positive points and
    # sample negatives around the
    # resulting centroids. Two
    # clustering options are
    # supported: KMeans and DBSCAN.
    # ---------------------------
    elif strategy == 'clustered_negatives':
        coords = df[[lon_col, lat_col]].values
    
        # --- choose clustering backend
        if cluster_method.lower() == 'kmeans':
            clustering = KMeans(
                n_clusters=max(2, len(df) // 10),
                random_state=seed
            ).fit(coords)
            centers = clustering.cluster_centers_
    
        elif cluster_method.lower() == 'dbscan':
            clustering = DBSCAN(
                eps=buffer_deg * 2,     # search radius (deg)
                min_samples=5
            ).fit(coords)
            labels  = clustering.labels_
            centers = []
            for lbl in np.unique(labels):
                if lbl == -1:           # DBSCAN noise
                    continue
                pts = coords[labels == lbl]
                centers.append(pts.mean(axis=0))
            centers = np.asarray(centers)
    
        else:
            raise ValueError(
                "cluster_method must be either "
                "'KMeans' or 'DBSCAN'"
            )
        # --- generate negatives near each centroid
        for center in centers:
            for _ in range(num_neg_per_pos):
                lat_offset = np.random.uniform(
                    -buffer_deg, buffer_deg
                )
                lon_offset = np.random.uniform(
                    -buffer_deg, buffer_deg
                )
                sample = {
                    lat_col: center[1] + lat_offset,
                    lon_col: center[0] + lon_offset,
                    target_col: 0
                }
                for col in feature_cols:
                    sample[col] = np.random.uniform(
                        *neg_feature_range
                    )
                negatives.append(sample)
    
    # -----------------------
    # ENVIRONMENTAL_SIMILARITY:
    # Nearest neighbor approach on
    # specific feature vectors.
    # -----------------------
    elif strategy == 'environmental_similarity':
        if similarity_features is None:
            raise ValueError(
                "Specify similarity_features for "
                "'environmental_similarity'."
            )
        nn = NearestNeighbors(n_neighbors=1)
        nn.fit(df[similarity_features])
        for _ in range(len(df) * num_neg_per_pos):
            random_vector = np.random.uniform(
                0, 1,
                size=len(similarity_features)
            )
            dist, idx = nn.kneighbors([
                random_vector
            ])
            ref = df.iloc[idx[0][0]]
            sample = ref.copy()
            sample[target_col] = 0
            sample[lon_col] += np.random.uniform(
                -buffer_deg,
                buffer_deg
            )
            sample[lat_col] += np.random.uniform(
                -buffer_deg,
                buffer_deg
            )
            negatives.append(
                sample.to_dict()
            )

    # -----------------------
    # ELEVATION_BASED:
    # Pick negative samples from
    # low slope areas, etc.
    # -----------------------
    elif strategy == 'elevation_based':
        if elevation_data is None:
            raise ValueError(
                "Provide elevation data for "
                "'elevation_based' strategy."
            )
        low_slope_areas = elevation_data[
            elevation_data['slope'] < buffer_km
        ]
        sample_count = min(
            len(df) * num_neg_per_pos,
            len(low_slope_areas)
        )
        for _, row in low_slope_areas.sample(
            n=sample_count
        ).iterrows():
            sample = row.to_dict()
            sample[target_col] = 0
            negatives.append(sample)

    # -----------------------
    # HYBRID:
    # Combine partial sets of
    # negative samples from
    # different strategies.
    # -----------------------
    elif strategy == 'hybrid':
        part1 = gen_buffered_negative_samples(
            df,
            target_col,
            spatial_cols,
            feature_cols,
            buffer_km,
            neg_feature_range,
            num_neg_per_pos // 2,
            strategy='landslide',
            gauge_data=None,
            seed=seed,
            verbose=0
        )
        part2 = gen_buffered_negative_samples(
            df,
            target_col,
            spatial_cols,
            feature_cols,
            buffer_km,
            neg_feature_range,
            num_neg_per_pos // 2,
            strategy='gauge',
            gauge_data=gauge_data,
            seed=seed,
            verbose=0
        )
        combined = pd.concat([part1, part2], ignore_index=True)

        # Deduplicate based on spatial + feature + target columns
        dedup_cols = spatial_cols + (feature_cols or []) + [target_col]
        combined = combined.drop_duplicates(subset=dedup_cols)
    
        if verbose >= 1:
            print(
                "[INFO] Final dataset after deduplication:"
                f" {len(combined)} samples"
            )
            
        if view:
            _visualize_negative_sampling(
                df_combined=combined,
                base_points= df if strategy == 'landslide' else gauge_data,
                strategy=strategy,
                spatial_cols=spatial_cols,
                target_col=target_col,
                buffer_km=buffer_km, 
                title="Sample Generation via Hybrid Strategy",
            )
        return combined

    # Create DataFrame of negative samples.
    df_neg = pd.DataFrame(negatives)

    # Copy df for positive samples.
    df_pos = df.copy()
    df_pos[target_col] = 1

    # Combine positives and negatives.
    combined = pd.concat(
        [df_pos, df_neg],
        ignore_index=True
    )
    
    if view:
        _visualize_negative_sampling(
            df_combined=combined,
            base_points= df,
            strategy=strategy,
            spatial_cols=spatial_cols,
            target_col=target_col,
            buffer_km=buffer_km, 
        )
        
    # Optionally print info about the results.
    if verbose >= 1:
        print(
            f"[INFO] Generated {len(df_neg)} negative "
            f"samples using strategy '{strategy}'"
        )
        print(
            f"[INFO] Total dataset: {len(combined)} "
            "samples"
        )

    return combined

@SaveFile 
@isdf          
def gen_buffered_negative_samples(
    df: pd.DataFrame,
    target_col: str,
    spatial_cols: Tuple[str, str] = ('longitude', 'latitude'),
    feature_cols: Optional[List[str]] = None,
    buffer_km: float = 10,
    neg_feature_range: Tuple[float, float] = (0, 5),
    num_neg_per_pos: int = 1,
    strategy: str = 'landslide',  
    gauge_data: Optional[pd.DataFrame] = None,
    use_gpd: Union[bool, str] = 'auto',
    id_col='auto', 
    view: bool = False,
    savefile: Optional[str] = None,
    seed: Optional[int] = None, 
    verbose: int = 1,
) -> pd.DataFrame:
    
    """
    Generate buffer-based negative samples around existing
    points or gauge stations.
    
    This function creates additional negative samples
    for binary spatial events. It either takes an existing
    landslide dataset (when `strategy` is `'landslide'`)
    or a separate gauge dataset (if `strategy` is
    `'gauge'`) to serve as the base points for generating
    negatives within a circular buffer. The function
    validates input columns and parameters via
    `_validate_negatives_sampling` before constructing
    synthetic samples [1]_.
    
    Parameters
    ----------
    df : pandas.DataFrame
        The DataFrame containing positive event samples
        (e.g., landslides). Must include `<target_col>`
        and `<spatial_cols>`.
    target_col : str
        Name of the binary target column (1 for event,
        0 for no event).
    spatial_cols : tuple of str, default ('longitude', 'latitude')
        Indicates which columns hold `<longitude>` and
        `latitude` in `df`.
    feature_cols : list of str, optional
        Additional feature columns to simulate or copy
        for negatives. If ``None``, all columns except
        `<spatial_cols>` and `target_col` are used.
    buffer_km : float, default 10
        The radial distance in kilometers for sampling
        negative points around each base point.
    neg_feature_range : tuple of float, default (0, 5)
        A numeric range from which feature values are
        drawn for negative samples if the column
        is numeric.
    num_neg_per_pos : int, default 1
        Number of negatives to generate per positive
        (landslide) or gauge point.
    strategy : str, default 'landslide'
        Defines the base from which negative samples
        are generated:
          - `'landslide'` or ``'event'``: Use rows from `df`
            as base. 
          - `'gauge'`: Use rows from `<gauge_data>` as base.
    gauge_data : pandas.DataFrame, optional
        Required if `<strategy>` is `'gauge'`. Must
        contain `<spatial_cols>`.
    use_gpd : bool or 'auto', default 'auto'
        If `'auto'`, attempts to use GeoPandas for
        visualization if installed. Otherwise,
        falls back to Matplotlib. This parameter
        is forwarded to the underlying visualization
        function.
    id_col : str or list of str, default 'auto'
        Column(s) representing IDs in `<df>`. If
        `'auto'`, the function tries to detect
        possible ID columns. Used by
        `_validate_negatives_sampling`.
    view : bool, default False
        Whether to visualize the sampled negatives
        around the base points.
    savefile : str, optional
        If provided, saves the final combined dataset
        (positives and negatives) to a CSV file at the
        specified path.
    seed : int, optional
        Seed for NumPy's random generator, ensuring
        reproducible offsets in negative sampling.
    verbose : int, default 1
        Controls console messages: `1` for minimal,
        `2` for more detailed logs.

    Returns
    -------
    pandas.DataFrame
        The combined dataset containing both the original
        (positive) rows, labeled with `target_col`=1,
        and the newly generated negative rows, labeled
        `target_col`=0.
    
    Methods
    -------
    `_validate_negatives_sampling`
        Validates required columns and parameters,
        including `<num_neg_per_pos>` and
        `<neg_feature_range>`.
    `visualize_negative_sampling`
        Generates a plot showing the negative samples
        around the base points if `<view>` is True.
    
    Notes
    -----
    - If `strategy` is `'gauge'`, `gauge_data` must
      be provided and contain columns `longitude` and
      `latitude`.
    - When `<view>` is True, circles are drawn to
      illustrate the buffer radius.
    - The ratio of 1° ~ 111 km is approximate and can
      vary slightly by latitude [1]_.
      
    Formally, a buffer in degrees
    :math:`\\Delta` is computed by:
    
    .. math::
       \\Delta = \\frac{\\text{buffer\\_km}}{111},
    
    where :math:`111` is an approximate km-per-degree
    conversion factor. Each base point
    :math:`(lat, lon)` spawns :math:`n` negatives, each
    offset by :math:`\\delta_{lat}`, :math:`\\delta_{lon}`
    drawn from :math:`U(-\\Delta, \\Delta)`.
    
    
    Examples
    --------
    Below is an illustration of how to generate negative samples
    around both existing event locations (strategy=`landslide`)
    and separate gauge stations (strategy=`gauge`) using
    ``gen_buffered_negative_samples``.
    
    First, we simulate a small DataFrame of positive
    landslide samples with rainfall attributes, as well
    as a separate DataFrame for gauge stations:
    
    >>> import numpy as np
    >>> import pandas as pd
    >>> np.random.seed(42)
    
    >>> positive_samples = pd.DataFrame({
    ...     'id': [1, 2, 3, 4, 5],
    ...     'latitude': np.random.uniform(24.0, 25.0, 5),
    ...     'longitude': np.random.uniform(113.0, 114.0, 5),
    ...     'rainfall_day_1': np.random.randint(10, 30, 5),
    ...     'rainfall_day_2': np.random.randint(10, 30, 5),
    ...     'rainfall_day_3': np.random.randint(10, 30, 5),
    ...     'rainfall_day_4': np.random.randint(10, 30, 5),
    ...     'rainfall_day_5': np.random.randint(10, 30, 5),
    ...     'landslide': [1]*5
    ... })
    
    >>> gauge_data = pd.DataFrame({
    ...     'gauge_id': ['G1', 'G2', 'G3'],
    ...     'latitude': np.random.uniform(24.0, 25.0, 3),
    ...     'longitude': np.random.uniform(113.0, 114.0, 3)
    ... })
    
    We then call ``gen_buffered_negative_samples`` to
    produce negatives around these data using two
    different strategies:
    
    >>> from fusionlab.utils.spatial_utils import gen_buffered_negative_samples
    
    >>> # Generate negatives around landslide points
    >>> results_landslide = generate_negative_samples_with(
    ...     df=positive_samples,
    ...     target_col='landslide',
    ...     spatial_cols=('longitude', 'latitude'),
    ...     feature_cols=[f'rainfall_day_{i+1}' for i in range(5)],
    ...     buffer_km=10,
    ...     num_neg_per_pos=1,
    ...     strategy='landslide',
    ...     verbose=1
    ... )
    
    >>> # Generate negatives around the gauge stations
    >>> results_gauge = gen_buffered_negative_samples(
    ...     df=positive_samples,
    ...     target_col='landslide',
    ...     spatial_cols=('longitude', 'latitude'),
    ...     feature_cols=[f'rainfall_day_{i+1}' for i in range(5)],
    ...     buffer_km=10,
    ...     num_neg_per_pos=1,
    ...     strategy='gauge',
    ...     gauge_data=gauge_data,
    ...     verbose=1
    ... )

    
    See Also
    --------
    generate_negative_samples: 
        Generate synthetic negative samples for spatial 
        binary classification tasks.
    _validate_negatives_sampling: 
        Ensures inputs and parameters are correct.
    visualize_negative_sampling : 
        Plots the positive and negative points for inspection.
    
    References
    ----------
    .. [1] "What is a degree of Latitude/Longitude?"
           US National Geodetic Survey (NGS),
           https://www.ngs.noaa.gov/.
    """
    # See for reproducibility 
    strategy = ( 
        'landslide' if strategy.lower() in ('landslide','event')
        else strategy
    )
    np.random.seed(seed)

    # Validate input and columns
    checked_values = _validate_negative_sampling (
        df=df, 
        target_col=target_col, 
        spatial_cols= spatial_cols, 
        feature_cols= feature_cols, 
        neg_feature_range=neg_feature_range, 
        num_neg_per_pos= num_neg_per_pos, 
        verbose=verbose, 
        id_col=id_col, 
        )
    ( 
     spatial_cols, feature_cols,
     neg_feature_range, num_neg_per_pos
     ) = checked_values  
    
    lon_col, lat_col = spatial_cols 

    if verbose >= 1:
        print(f"[INFO] Generating negative samples using strategy: {strategy}")

    buffer_deg = buffer_km / 111.0
    negatives = []
    
    if strategy == 'gauge':
        if gauge_data is None:
            raise ValueError(
                "Gauge data must be provided for 'gauge' strategy."
                )

    base_points = df if strategy == 'landslide' else gauge_data

    if (base_points is None
       or not all(c in base_points.columns for c in spatial_cols)):
        raise ValueError(
            "Missing gauge data or invalid spatial columns."
        )

    for _, point in base_points.iterrows():
        for _ in range(num_neg_per_pos):
            lat_offset = np.random.uniform(
                -buffer_deg,
                buffer_deg
            )
            lon_offset = np.random.uniform(
                -buffer_deg,
                buffer_deg
            )
            new_lat = point[lat_col] + lat_offset
            new_lon = point[lon_col] + lon_offset

            sample = {
                lat_col: new_lat,
                lon_col: new_lon,
                target_col: 0
            }

            for col in feature_cols:
                sample[col] = np.random.uniform(
                    *neg_feature_range
                )

            negatives.append(sample)

    df_neg = pd.DataFrame(negatives)
    df_pos = df.copy()
    df_pos[target_col] = 1

    combined = pd.concat(
        [df_pos, df_neg],
        ignore_index=True
    )

    if verbose >= 1:
        print(f"[INFO] Generated {len(df_neg)} negative samples")
        print(f"[INFO] Total dataset: {len(combined)} samples")

    if view:
        _visualize_negative_sampling(
            df_combined=combined,
            base_points=base_points,
            strategy=strategy,
            spatial_cols=spatial_cols,
            target_col=target_col,
            buffer_km=buffer_km
        )

    return combined

def _visualize_negative_sampling(
        df_combined, base_points, strategy, spatial_cols, 
        target_col, buffer_km=10, s=50, title=None, 
        ):
    

    lon_col, lat_col = spatial_cols
    buffer_deg = buffer_km / 111.0

    fig, ax = plt.subplots(figsize=(10, 8))

    colors = {1: 'red', 0: 'green'}
    labels = {1: 'Positive', 0: 'Negative'}

    for val in [1, 0]:
        subset = df_combined[df_combined[target_col] == val]
        ax.scatter(subset[lon_col], subset[lat_col],
                   color=colors[val], label=f"{labels[val]} Samples", s=s)

    for _, row in base_points.iterrows():
        circle = Circle(
            (row[lon_col], row[lat_col]),
            buffer_deg,
            color='blue',
            alpha=0.2
        )
        ax.add_patch(circle)

    if strategy == 'gauge':
        ax.scatter(base_points[lon_col], base_points[lat_col], 
                   color='blue', label='Gauges',
                   marker='x', s=+20
                   )
    if strategy == 'clustered_negatives':
        ax.scatter(base_points[lon_col], base_points[lat_col],
                   color='blue', label='Cluster Centers',
                   marker='P', s=s +30
                   )
        
    ax.set_xlabel("Longitude")
    ax.set_ylabel("Latitude")
    ax.set_title(title or f"Negative Sample Generation Strategy: {strategy}")
    ax.legend(handles=[
        Patch(color='red', label='Positive Samples'),
        Patch(color='green', label='Negative Samples'),
        Patch(color='blue', alpha=0.2, label=f'{buffer_km} km Buffer')
    ])
    
    ax.grid (True, **{"linestyle":":", "alpha": 0.7})

    plt.tight_layout()
    plt.show()

@SaveFile
@isdf
def gen_negative_samples(
    df: DataFrame,
    target_col: str,
    spatial_cols: Tuple [str, str]=('longitude', 'latitude'),
    feature_cols: Optional[List[str]]=None,
    buffer_km: float=10,
    neg_feature_range: Tuple[float, float]=(0, 5),
    num_neg_per_pos: int=1,
    use_gpd: Union [bool, str]='auto',
    view: bool=False,
    savefile: Optional[str] = None, 
    verbose: int=1
):
    r"""
    Generate synthetic negative samples for spatial binary
    classification tasks.
    
    This function creates additional samples labeled as
    non-events within a specified spatial buffer around
    the positive (event) observations. The main idea is to
    generate negative examples that reflect realistic
    conditions but have not triggered an event, thereby
    assisting models in distinguishing occurrences from
    non-occurrences [1]_.
    
    Parameters
    ----------
    df : pandas.DataFrame
        Input DataFrame containing the positive samples
        (events). Must include both the target column
        and the specified spatial columns.
    target_col : str
        Column name for the binary target (e.g.
        `landslide`). Rows where this column is
        1 (or True) are considered positive samples.
    spatial_cols : tuple of str, default ('longitude', 'latitude')
        Tuple specifying the longitude> and latitude
        column names in `df`.
    feature_cols : list of str or None, default None
        List of feature columns to use or to simulate
        for generated negatives. If ``None``, the
        function automatically detects numeric and
        categorical columns excluding `spatial_cols`
        and `target_col`.
    buffer_km : float, default 10
        Spatial buffer in kilometers used to define
        the radius around each positive sample
        within which negative samples are created.
    neg_feature_range : tuple of int, default (0, 5)
        Value range (minimum, maximum) used for
        simulating numeric feature values in negative
        samples if the corresponding feature column
        does not exist in `<df>`.
    num_neg_per_pos : int, default 1
        Number of negative samples to generate
        per positive sample. For instance, if
        ``num_neg_per_pos=2``, each positive sample
        spawns two negatives.
    use_gpd : str, default 'auto'
        If set to 'auto', the function tries to
        import GeoPandas for visualization. If
        `'none'`, no GeoPandas usage will occur.
    view : bool, default False
        Whether to visualize the generated samples
        on a map. Attempts to use `geopandas` if
        installed; falls back to `matplotlib` if
        `'auto'` is chosen and GeoPandas is not
        available.
    savefile : str or None, default None
        Path to which the resulting DataFrame is
        saved if provided. Handled by the decorator
        that wraps this function.
    verbose : int, default 1
        Verbosity level. `0` for silent,
        `1` for progress indication, `2` for
        more messages, `3` for debugging output.
    
    Returns
    -------
    pandas.DataFrame
        Combined DataFrame with both original positive
        samples and newly generated negative samples. The
        `<target_col>` is 1 for positives and 0 for negatives.
    
    Methods
    -------
    `columns_manager`
        This internal function is used to handle the
        processing of columns for features and
        spatial parameters.
    
    Notes
    -----
    - If a feature column exists in `df`, the negative
      samples will copy or randomly select categories for
      categorical columns, and sample integers within
      ``neg_feature_range`` for numeric columns.
    - If `feature_cols` is empty or does not exist
      in `df`, the function simulates all values for
      negative samples.
    - When `view=True`, circles depicting the buffer
      zone around each positive sample are drawn for
      visualization.
    - The approximation of 1° ~ 111 km varies slightly
      depending on latitude [1]_.
    
    Mathematically, we define the spatial buffer in degrees
    as:
    
    .. math::
       \\Delta = \\frac{\\text{buffer_km}}{111.0},
    
    where :math:`111.0` km approximates the distance of
    one degree of latitude or longitude [1]_. For each
    positive sample at location :math:`(lat, lon)`,
    we generate :math:`n` new points with offsets
    :math:`\\delta_{lat}` and :math:`\\delta_{lon}`, each
    drawn from a uniform distribution
    :math:`U(-\\Delta, \\Delta)`:
    
    .. math::
       \\begin{aligned}
       &lat_{new} = lat + \\delta_{lat},\\\\
       &lon_{new} = lon + \\delta_{lon}.
       \\end{aligned}
    
    Combined with randomly sampled or inferred feature
    values, these new samples serve as negative examples
    for modeling tasks such as landslide prediction.
    
    Examples
    --------
    >>> from fusionlab.utils.spatial_utils import gen_negative_samples
    >>> import pandas as pd
    >>> import numpy as np
    >>> df_pos = pd.DataFrame({
    ...        'latitude': np.random.uniform(24.0, 25.0, 5),
    ...        'longitude': np.random.uniform(113.0, 114.0, 5),
    ...        'rainfall_day_1': np.random.randint(10, 30, 5),
    ...        'rainfall_day_2': np.random.randint(10, 30, 5),
    ...        'rainfall_day_3': np.random.randint(10, 30, 5),
    ...        'rainfall_day_4': np.random.randint(10, 30, 5),
    ...        'rainfall_day_5': np.random.randint(10, 30, 5),
    ...        'landslide': 1
    ...    })
    >>> combined = gen_negative_samples(
    ...     df=df_pos,
    ...     target_col='landslide',
    ...     buffer_km=10,
    ...     num_neg_per_pos=2,
    ...     view=False,
    ...     verbose=2
    ... )
    >>> print(combined.head())
    
    See Also
    --------
    check_spatial_columns : Ensures the existence of
                              required spatial columns.
    exist_features : Verifies the presence of
                       specified features in `<df>`.
    columns_manager : Handles both feature and spatial
                        columns for processing.
    
    References
    ----------
    .. [1] US National Geodetic Survey (NGS). "What is a
           degree of Latitude/Longitude?"
           (https://www.ngs.noaa.gov/).
    """
    # Validate input and columns
    checked_values = _validate_negative_sampling (
        df=df, 
        target_col=target_col, 
        spatial_cols= spatial_cols, 
        feature_cols= feature_cols, 
        neg_feature_range=neg_feature_range, 
        num_neg_per_pos= num_neg_per_pos, 
        verbose=verbose 
        )
    ( 
     spatial_cols, feature_cols,
     neg_feature_range, num_neg_per_pos
     ) = checked_values  
    
    lon_col, lat_col = spatial_cols 
    
    # Check GeoPandas
    HAS_GPD = False
    try:
        import geopandas as gpd
        from shapely.geometry import Point
        HAS_GPD = True
    except ImportError:
        HAS_GPD = False

    # Buffer conversion (km -> deg)
    buffer_deg = buffer_km / 111.0

    if verbose >= 2:
        print(f"[INFO] Generating negative samples within "
              f"{buffer_km} km buffer zones.")
    if verbose >= 3:
        print(f"[DEBUG] Buffer in degrees: {buffer_deg:.4f}")

    # Generate negative samples per row
    negative_samples = []
    iterator = df.iterrows()
    if verbose >= 1 and HAS_TQDM:
        iterator = tqdm(
            iterator,
            total=len(df),
            desc="Generating negatives"
        )

    for _, row in iterator:
        for _ in range(num_neg_per_pos):
            lat_offset = np.random.uniform(-buffer_deg,
                                           buffer_deg)
            lon_offset = np.random.uniform(-buffer_deg,
                                           buffer_deg)
            new_lat = row[lat_col] + lat_offset
            new_lon = row[lon_col] + lon_offset

            sample = {
                lat_col: new_lat,
                lon_col: new_lon,
                target_col: 0
            }

            # Simulate or copy feature columns for negs
            for col in feature_cols:
                # Categorical features
                if (df[col].dtype == 'object'
                   or df[col].dtype.name == 'category'):
                    unique_vals = (df[col].dropna()
                                      .unique())
                    if len(unique_vals) > 0:
                        sample[col] = np.random.choice(
                                          unique_vals
                                       )
                    else:
                        sample[col] = None
                # Numeric features
                elif np.issubdtype(df[col].dtype,
                                   np.number):
                    sample[col] = np.random.randint(
                        neg_feature_range[0],
                        neg_feature_range[1] + 1
                    )
                else:
                    sample[col] = None

            negative_samples.append(sample)

    df_negative = pd.DataFrame(negative_samples)

    # Label positive set and combine
    df_positive = df.copy()
    df_positive[target_col] = 1

    combined = pd.concat([df_positive, df_negative],
                         ignore_index=True)

    if verbose >= 2:
        print(f"\n[INFO] Final dataset: {len(df_positive)} "
              f"positive and {len(df_negative)} negative "
              f"samples.")


    # Optional View/Plot
    if view:
        if (use_gpd == 'auto') and HAS_GPD:
            if verbose >= 2:
                print("[INFO] Visualizing with GeoPandas.")

            geometry = [
                Point(xy) for xy in zip(
                    combined[lon_col],
                    combined[lat_col]
                )
            ]
            gdf = gpd.GeoDataFrame(combined,
                                   geometry=geometry)

            fig, ax = plt.subplots(figsize=(10, 8))
            for label, color, name in zip(
                [1, 0],
                ['red', 'green'],
                ['Positive', 'Negative']
            ):
                gdf[gdf[target_col] == label].plot(
                    ax=ax,
                    color=color,
                    label=f"{name} Sample",
                    markersize=50
                )

            for _, row in df_positive.iterrows():
                circle = plt.Circle(
                    (row[lon_col], row[lat_col]),
                    buffer_deg,
                    color='blue',
                    alpha=0.2
                )
                ax.add_patch(circle)

        else:
            if verbose >= 2:
                print("[INFO] Visualizing with Matplotlib.")

            fig, ax = plt.subplots(figsize=(10, 8))
            for label, color, name in zip(
                [1, 0],
                ['red', 'green'],
                ['Positive', 'Negative']
            ):
                subset = combined[combined[target_col]
                                  == label]
                ax.scatter(subset[lon_col],
                           subset[lat_col],
                           color=color,
                           label=f"{name} Sample",
                           s=50)

            for _, row in df_positive.iterrows():
                circle = plt.Circle(
                    (row[lon_col], row[lat_col]),
                    buffer_deg,
                    color='blue',
                    alpha=0.2
                )
                ax.add_patch(circle)

        ax.set_title("Negative Sample Generation with "
                     "Spatial Buffers")
        ax.set_xlabel("Longitude")
        ax.set_ylabel("Latitude")
        ax.legend(handles=[
            Patch(color='red', label='Positive Samples'),
            Patch(color='green', label='Negative Samples'),
            Patch(color='blue', alpha=0.2,
                  label=f"{buffer_km} km Buffer")
        ])
        ax.grid (True, **{"linestyle":":", "alpha": 0.7})
        plt.tight_layout()
        plt.show()

    return combined

def _validate_negative_sampling(
    df: DataFrame,
    target_col: str,
    spatial_cols: Tuple[str, str] = ('longitude', 'latitude'),
    feature_cols: Optional[List[str]] = None,
    neg_feature_range: Tuple[float, float] = (0, 5),
    num_neg_per_pos: int = 1,
    id_col: Optional [str]="auto",
    verbose: int=0
):
    """
    Helper that validates input parameters and column configurations
    for the generation of negative samples. Ensures required columns exist,
    processes feature and spatial columns, and checks numeric parameters
    like `neg_feature_range` and `num_neg_per_pos`.
    """
    # Validate input and columns
    exist_features(df,features=target_col,
                   name=f"Target '{target_col}'")

    feature_cols = columns_manager(feature_cols)
    spatial_cols = columns_manager(spatial_cols)
 
    check_spatial_columns(df, spatial_cols=spatial_cols)
    exist_features(df,features=spatial_cols,
                   name="Spatial columns")
    neg_feature_range= validate_length_range(
        neg_feature_range, 
        param_name="neg_feature_range"
    )
    num_neg_per_pos= validate_positive_integer(
        num_neg_per_pos, 
        "num_neg_per_pos", 
    )
    # detect id columns if auto 
    if str(id_col).lower() =='auto': 
        id_col = find_id_column(
            df, strategy= 'naive', 
            errors='ignore'
        )
    
    id_col= columns_manager( id_col, empty_as_none= False ) 
    # If no feature_cols provided, auto-detect from remaining columns
    # including 
    if feature_cols is None:
        excluded_cols = list(spatial_cols) + [target_col] + id_col
        feature_cols = [
            col for col in df.columns
            if col not in excluded_cols
        ]
        if verbose >= 2:
            print(f"[INFO] Auto-detected feature_cols:"
                  f" {feature_cols}")

    feature_cols = columns_manager(
        feature_cols,
        empty_as_none=False
    )

    if len(feature_cols) == 0:
        raise ValueError("No feature columns found. Please "
                         "specify `feature_cols` explicitly.")

    # Warn if a feature column does not exist in df
    for col in feature_cols:
        if col not in df.columns and verbose >= 2:
            print(f"[WARN] Feature column '{col}' not "
                  "found in df; it will be simulated.")
            
    return ( 
        spatial_cols, 
        feature_cols,
        neg_feature_range, 
        num_neg_per_pos, 
     )

[docs] @SaveFile @isdf def create_spatial_clusters( df: pd.DataFrame, spatial_cols: Optional[List[str]] = None , cluster_col: str = 'region', n_clusters: Optional[int] = None, algorithm: str = 'kmeans', view: bool = True, figsize: tuple = (14, 10), s: int=60, plot_style: str = 'seaborn', cmap: str = 'tab20', show_grid: bool=True, grid_props: dict =None, auto_scale: bool = True, savefile: Optional[str]=None, verbose: int = 1, **kwargs ) -> pd.DataFrame: """ Cluster 2D spatial data in ``df`` using `<algorithm>` and optionally plot the results. This function, `<create_spatial_clusters>`, extracts two coordinate columns from `<df>` to form clusters via methods such as 'kmeans', 'dbscan', or 'agglo' (agglomerative). It uses the function `filter_valid_kwargs` (when relevant) to strip out invalid parameters for certain estimators, and writes cluster labels into `<cluster_col>`. Parameters ---------- df : pandas.DataFrame Input DataFrame holding spatial coordinates and optional other fields. spatial_cols : list of str, optional Two-column list for x and y coordinates. Defaults to ``['longitude','latitude']`` if None. cluster_col : str, default='region' Name of the column to store the assigned cluster labels. n_clusters : int, optional Number of clusters to form. If not provided for KMeans, it is auto-detected. For DBSCAN or Agglomerative, a warning is issued if not set. algorithm : str, default='kmeans' Choice of clustering algorithm among ``['kmeans','dbscan','agglo']``. view : bool, default=True If True, displays a scatterplot of the final clusters. figsize : tuple, default=(14, 10) Size of the displayed figure for the cluster plot. s : int, default=60 Marker size in the scatterplot. plot_style : str, default='seaborn' Matplotlib style used for the plot. cmap : str, default='tab20' Colormap name used to differentiate clusters. show_grid : bool, default=True Toggles grid lines on or off. grid_props : dict, optional Additional keyword arguments controlling the grid style. auto_scale : bool, default=True If True, standardize coordinates before clustering. savefile : str, optional File path to save the data with an additional `<cluster_col>` storing the assigned cluster labels if desired. verbose : int, default=1 Controls console logs. Higher values yield more details about scaling and cluster detection. **kwargs Additional keyword arguments passed to the chosen algorithm (filtered by `filter_valid_kwargs` for KMeans, DBSCAN, AgglomerativeClustering ). Returns ------- pandas.DataFrame A copy of `<df>` with an additional `<cluster_col>` storing the assigned cluster labels. Notes ----- If `<auto_scale>` is True, it uses a standard scaler to normalize the coordinate columns. The scatterplot is generated using the library seaborn for enhanced styling. By default, for `<algorithm>` = "kmeans", the model attempts to minimize: .. math:: J = \\sum_{i=1}^{N} \\min_{\\mu_j} \\lVert x_i - \\mu_j \\rVert^2 where :math:`x_i` are the scaled or raw 2D coordinates in `<df>`. The function can optionally auto-detect ``n_clusters`` using a silhouette and elbow analysis if not provided. Examples -------- >>> from fusionlab.utils.spatial_utils import create_spatial_clusters >>> import pandas as pd >>> df = pd.DataFrame({ ... "longitude": [0.1, 0.2, 2.2, 2.3], ... "latitude": [1.0, 1.1, 2.1, 2.2] ... }) >>> # KMeans with auto scale and auto-detect k >>> result = create_spatial_clusters( ... df=df, ... algorithm="kmeans", ... view=True ... ) >>> # DBSCAN with custom arguments >>> result_db = create_spatial_clusters( ... df=df, ... algorithm="dbscan", ... eps=0.5, ... min_samples=2 ... ) See Also -------- filter_valid_kwargs : Helps discard unsupported keyword arguments for chosen estimators. References ---------- .. [1] Pedregosa et al. *Scikit-learn: Machine Learning in Python*, JMLR 12, pp. 2825-2830, 2011. """ # Confirm required columns exist in DataFrame # This prevents missing key data issues if spatial_cols is None: spatial_cols = ['longitude', 'latitude'] assert all(col in df.columns for col in spatial_cols), ( "Missing spatial columns" ) assert len(spatial_cols) == 2, ( f"Need exactly 2 spatial columns. Got {len(spatial_cols)}" ) assert algorithm in ['kmeans', 'dbscan', 'agglo'], ( "Invalid algorithm. Expect one of ['kmeans', 'dbscan', 'agglo']." ) # Use requested plotting style plt.style.use(plot_style) # Copy DataFrame to avoid modification local_df = df.copy() # Extract coordinates from the spatial columns coords = local_df[spatial_cols].values # Debug info about data shape if verbosity is high if verbose >= 2: print(f"DataFrame shape: {local_df.shape}") print("Initial coords sample:", coords[:5]) # Scale coordinates to standardize range, if requested if auto_scale: if verbose >= 1: print("Scaling coordinates...") scaler = StandardScaler() coords = scaler.fit_transform(coords) # Debug info about scaled coords if verbosity is high if verbose >= 2: print("Scaled coords sample:", coords[:5]) # Determine an optimal number of clusters if none provided if n_clusters is None: if algorithm == 'kmeans': if verbose >= 1: print("Auto-detecting optimal number of clusters...") n_clusters = _auto_detect_k( coords=coords, verbose=verbose, show_grid=show_grid, grid_props=grid_props ) else: # Warn if user expects auto-detection with non-kmeans warnings.warn( "Auto-cluster detection only supported for KMeans" ) # Notify user about the clustering approach if verbose >= 1: print(f"Clustering with {algorithm.upper()}...") # Initialize the selected clustering algorithm if algorithm == 'kmeans': kwargs = filter_valid_kwargs(KMeans, kwargs) clusterer = KMeans( n_clusters=n_clusters, random_state=42, **kwargs ) elif algorithm == 'dbscan': kwargs = filter_valid_kwargs (DBSCAN, kwargs) clusterer = DBSCAN(**kwargs) else: # algorithm == 'agglo' kwargs = filter_valid_kwargs (AgglomerativeClustering, kwargs) clusterer = AgglomerativeClustering( n_clusters=n_clusters, **kwargs ) # Fit the model and predict cluster labels labels = clusterer.fit_predict(coords) local_df[cluster_col] = labels # Debug info about clustering output if verbosity is high if verbose >= 2: unique_labels = np.unique(labels) print(f"Unique cluster labels: {unique_labels}") # Plot the clusters if requested if view: _plot_clusters( df=local_df, spatial_cols=spatial_cols, cluster_col=cluster_col, figsize=figsize, cmap=cmap, algorithm=algorithm, s=s, show_grid=show_grid, grid_props=grid_props, ) # Return the DataFrame with assigned cluster labels return local_df
def _auto_detect_k( coords: np.ndarray, verbose: int, max_k: int = 10, show_grid: bool=True, grid_props: dict=None, ) -> int: # Evaluate multiple k values using # elbow (distortion) and silhouette scores distortions = [] silhouettes = [] K_range = range(2, max_k + 1) if verbose >= 1: print(f"Evaluating k from 2 to {max_k}...") for k in K_range: kmeans = KMeans(n_clusters=k, random_state=42, n_init='auto') kmeans.fit(coords) distortions.append(kmeans.inertia_) silhouettes.append( silhouette_score(coords, kmeans.labels_) ) # Detailed iteration log if verbosity is high if verbose >= 2: output = "k={0:<5} | Distortion={1:^28} | Silhouette={2:^20}".format( k, f'{distortions[-1]:.3f}', f'{silhouettes[-1]:.3f}') print(output) # Plot elbow and silhouette analyses fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5)) ax1.plot(K_range, distortions, 'bo-') ax1.set( xlabel='Number of clusters', ylabel='Distortion', title='Elbow Method' ) ax2.plot(K_range, silhouettes, 'go-') ax2.set( xlabel='Number of clusters', ylabel='Silhouette Score', title='Silhouette Analysis' ) # Enable gridlines if grid_props is None: grid_props= {"linestyle": ':', 'alpha': .7} if show_grid: ax1.grid(show_grid, **grid_props) else: ax1.grid(show_grid) if show_grid: ax2.grid(show_grid, **grid_props) else: ax2.grid(show_grid) plt.tight_layout() plt.show() # Use silhouette score peak to suggest optimal k optimal_k = np.argmax(silhouettes) + 2 if verbose >= 1: print(f"Suggested optimal k: {optimal_k}") return optimal_k def _plot_clusters( df: pd.DataFrame, spatial_cols: List[str], cluster_col: str, figsize: tuple, cmap: str, s: int, algorithm: str, show_grid: bool=True, grid_props: dict=None, ) -> None: # Create a scatterplot to visualize clustered data plt.figure(figsize=figsize) ax = sns.scatterplot( x=spatial_cols[0], y=spatial_cols[1], hue=cluster_col, palette=cmap, data=df, s=s, edgecolor='k', linewidth=0.5, alpha=0.8, legend='auto' ) # Professional labeling and presentation plt.title( f"{algorithm.upper()} Clustering - " f"{df[cluster_col].nunique()} Clusters", fontsize=14, pad=20 ) plt.xlabel(spatial_cols[0], fontsize=12) plt.ylabel(spatial_cols[1], fontsize=12) # Enable gridlines if show_grid: plt.grid(show_grid, **( grid_props or {"linestyle": ':', 'alpha': .7})) else: plt.grid(show_grid) # Adjust and position legend plt.legend( title='Cluster', bbox_to_anchor=(1.05, 1), loc='upper left' ) # Annotate cluster labels near their median positions for cluster in df[cluster_col].unique(): # Skip noise cluster (-1) for DBSCAN if cluster == -1: continue median_position = df[df[cluster_col] == cluster][ spatial_cols ].median() plt.text( median_position[0], median_position[1], str(cluster), fontdict=dict(weight='bold', size=10), bbox=dict( facecolor='white', alpha=0.8, edgecolor='none' ) ) # Hide top/right spines, tighten layout ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.figure.tight_layout() # Display the final clustering plot plt.show() @validate_params ({ 'threshold':[Interval(Real, 0, 1, closed='neither')], 'error': [StrOptions({'raise', 'warn', 'ignore'})], 'pos': [Real, 'array-like'] }) @isdf def filter_position( df, pos, pos_cols=None, find_closest=True, threshold=0.01, error='raise', verbose=0 ): """ filter_position is a utility that filters a pandas.DataFrame based on user-specified spatial positions. It can match positions exactly or compute distances to find the closest points within a threshold. For a single dimension, the distance is computed as: .. math:: d = |x - p| For multi-dimensional data with n coordinates, the Euclidean distance is computed as: .. math:: d = \\sqrt{\\sum_{i=1}^n (x_i - p_i)^2} Parameters ------------ df : pandas.DataFrame The DataFrame that will be filtered. This parameter is essential and must contain columns referenced by `pos_cols` if ``pos_cols`` is not None. pos : float or tuple of floats The reference position(s) to match or approximate. When `pos_cols` is None, `pos` is interpreted as an index value. Otherwise, each value in `pos` aligns with a specific column in `pos_cols`. pos_cols : str or tuple of str, optional Name(s) of the column(s) in `df` to match against `pos`. If ``pos_cols=None``, then `pos` is treated as an index filter. If multiple columns are given (e.g., latitude and longitude), each coordinate in `pos` should correspond to one column in `pos_cols`. find_closest : bool, optional If True, nearest-neighbor filtering is performed within the distance `threshold`. If False, exact matches are used. threshold : float, optional The maximum distance within which points are considered a match if `find_closest` is True. The unit corresponds to the column data (e.g., degrees for geographic lat/lon). error : {'raise', 'warn', 'ignore'}, optional Specifies how to handle dimension mismatches or missing values. If ``'raise'``, a ValueError will be raised. If ``'warn'``, a warning is printed and extra values are ignored. If ``'ignore'``, mismatches are silently ignored. verbose : int, optional Controls the level of output messages: - 0: No output - 1: Basic info - 2: Additional details - >=3: Comprehensive summary Returns ------- pandas.DataFrame A new DataFrame that contains only rows matching or approximating the specified position(s) within the given threshold if `find_closest` is True. Notes ----- When `pos_cols` is None, the function attempts to filter by DataFrame index using the first element of `pos`. This approach may fail for multi-level indexes unless ``error='warn'`` or ``error='ignore'`` is used to bypass the dimension mismatch. See [1]_ for further discussion on multi-dimensional indexing. Examples -------- >>> from fusionlab.utils.spatial_utils import filter_position >>> import pandas as pd >>> df = pd.DataFrame({ ... 'lat': [113.309998, 113.310001], ... 'lon': [22.831362, 22.831364] ... }) >>> # Exact match >>> result_exact = filter_position(df, pos=(113.309998, ... 22.831362), ... pos_cols=('lat', 'lon'), ... find_closest=False) >>> # Nearest match with threshold >>> result_close = filter_position(df, ... pos=(113.31, ... 22.83), ... pos_cols=('lat', ... 'lon'), ... find_closest=True, ... threshold=0.01) See Also -------- fusionlab.utils.data_utils.truncate_data: Truncate multiple DataFrames based on spatial coordinates or index alignment with a base DataFrame. References ---------- .. [1] Smith, J., & Doe, A. (2020). Multi-dimensional indexing in big data. Journal of Spatial Computing, 15(3), 200-210. """ # Initialize filtered_df as the original DataFrame filtered_df = df.copy() # Convert pos to tuple if it's not already if not isinstance(pos, (tuple, list)): pos = (pos,) # Helper function to handle errors def _handle_error(msg, ex_msg=''): if error == 'raise': raise ValueError(msg) elif error == 'warn': warnings.warn(f"{msg}{ex_msg}") # if error == 'ignore', do nothing # If no position columns specified, # treat pos as DataFrame index if pos_cols is None: if len(pos) > 1: _handle_error( "Multiple position values provided but no pos_cols. " "Unable to match multi-level index. ", "Using the first position value only." ) idx = pos[0] # Consider only the first if multiple are given if idx not in df.index: _handle_error( f"Index '{idx}' not found in DataFrame. ", "Returning empty DataFrame." ) return filtered_df.iloc[0:0] # Return empty DF if not found # Filter by index filtered_df = filtered_df.loc[[idx]] return filtered_df # Ensure pos_cols is tuple for consistency if not isinstance(pos_cols, (tuple, list)): pos_cols = (pos_cols,) pos_cols = columns_manager(pos_cols) # Check dimension match between pos and pos_cols if len(pos) != len(pos_cols): # If mismatch, handle based on error param _handle_error( f"pos ({len(pos)}) and pos_cols ({len(pos_cols)}) lengths " "do not match.", " Extra values or columns will be ignored." ) # If ignoring or warning, align up to the shortest length min_len = min(len(pos), len(pos_cols)) pos = pos[:min_len] pos_cols = pos_cols[:min_len] # If find_closest is disabled, do exact matching if not find_closest: # Build query for exact match mask = True for p_val, col in zip(pos, pos_cols): mask &= (filtered_df[col] == p_val) filtered_df = filtered_df[mask] return filtered_df # If find_closest is enabled, use threshold-based filtering # Compute distance (Euclidean if multiple columns, absolute if single) if len(pos_cols) == 1: # Single dimension: absolute difference col = pos_cols[0] distance = (filtered_df[col] - pos[0]).abs() filtered_df = filtered_df[distance <= threshold] else: # Multi-dimensional: Euclidean distance # Summation of squared diffs for each column squared_diff = 0 for p_val, col in zip(pos, pos_cols): squared_diff += (filtered_df[col] - p_val) ** 2 distance = squared_diff ** 0.5 filtered_df = filtered_df[distance <= threshold] # If verbose, provide basic info if verbose >= 1: print(f"Filtered {len(df)} rows to {len(filtered_df)} rows " f"within threshold {threshold}.") # If verbose >= 2, show sample of distance if verbose >= 2 and not filtered_df.empty: print("Example distance values within threshold:") print(distance[distance <= threshold].head()) return filtered_df @is_data_readable(data_to_read='data') @SaveFile @validate_params ({ 'z': ['array-like', str], 'threshold': ['array-like', StrOptions({'auto'}), Real], 'condition': [StrOptions({'auto', 'above', 'below', 'between'})], 'use_negative_criteria': [bool], 'percentile': [Real], 'x': ['array-like', str, None], 'y': ['array-like', str, None], 'data': ['array-like', None], }) def extract_zones_from( z, threshold='auto', condition='auto', use_negative_criteria=True, percentile=10, x=None, y=None, data=None, view=False, plot_type='scatter', figsize=(8, 6), savefile=None, axis_off=False, show_grid=True, **kwargs ): r""" Extracts specific zones by filtering an input array or arrays using a threshold criterion. This function applies a logical mask to the values and retains those which satisfy a chosen condition (e.g. ``'above'``, ``'below'``, or ``'between'`` a specific threshold or thresholds). The threshold can be automatically derived using percentiles if ``'auto'`` is specified. .. math:: \text{mask}(z) \;=\; \begin{cases} 1 & \text{if } z \,\in\, \Omega \\ 0 & \text{otherwise} \end{cases} where :math:`\Omega` is the region of acceptance determined by the threshold mechanism. Parameters ---------- z : array-like, Series, or string The input data to be filtered. If <z> is a string, it is treated as a column name from the provided <data> (see below). If it is an array or Series, it is used directly. threshold : { ``'auto'``, float, int, tuple } The criterion for filtering. If ``'auto'``, the function computes a percentile-based threshold driven by <percentile>. If a float or int is given, the function will filter values above or below that single threshold. If a tuple of length 2 is provided, the function will filter between those bounds. condition : { ``'auto'``, ``'above'``, ``'below'``, ``'between'`` } Defines how the filter is applied relative to the given or derived threshold. If ``'auto'``, the function decides based on <use_negative_criteria>. If ``'above'``, all values satisfying :math:`z > \text{threshold}` are kept. If ``'below'``, then :math:`z < \text{threshold}` are retained. If ``'between'``, a range (low, high) is respected. use_negative_criteria : bool When ``True``, automatically interprets ``'auto'`` condition as filtering from below, akin to negative-based risk. When ``False``, filters from above. percentile : int or float Used only if ``threshold='auto'``. Determines which percentile is used to compute the threshold. For example, if <use_negative_criteria> is ``True``, the <percentile>th percentile is chosen, otherwise the (100 - <percentile>)th percentile is used. x : array-like, Series, or string, optional The x-axis data. If <x> is a string and <data> is provided, the function extracts the relevant column. If <x> is an array or Series, it is used directly. y : array-like, Series, or string, optional Similar to <x>, representing the y-axis data. data : pandas.DataFrame, optional The DataFrame source if <x>, <y>, or <z> are provided as strings referencing column names. view : bool If ``True``, displays a plot of the filtered data. plot_type : { ``'scatter'``, ``'line'``, ``'hist'``, ... } Determines how the data are visualized when <view> is ``True``. figsize : tuple of int The size of the generated figure for plotting. E.g. ``(8,6)`` is typical. axis_off: bool, default=False Remove the axis if set to ``True``. show_grid: bool, default=True Display the plot grid or make it invisible if ``False``. Methods ------- The function itself does not expose methods starting with letters (excluding `_`), as it is a single operation. All steps are executed internally with no additional public methods. Notes ------ .. math:: \mathbf{z}_{\text{filtered}} \;=\; \{ z_i \mid \text{condition}(z_i) \} where :math:`\text{condition}(z_i)` is derived from ``threshold`` and ``condition``. For instance, if ``condition='below'`` and :math:`\tau = \text{threshold}`, then .. math:: \text{condition}(z_i) \;=\; [\, z_i < \tau \,]. This function relies on ``extract_array_from`` (from the fusionlab.core.array_manager) if <z>, <x>, or <y> are passed as strings and a <data> DataFrame is supplied. The user has the option to visualize the retained data points by enabling <view> and customizing <plot_type>. Examples -------- >>> from fusionlab.utils.spatial_utils import extract_zones_from >>> import numpy as np >>> z_data = np.array([0, 2, 5, 10, 15, 20]) >>> result = extract_zones_from(z=z_data, threshold=10, ... condition='above') >>> print(result) See Also -------- fusionlab.core.array_manager.extract_array_from`` : The array extraction utility used for retrieving arrays from DataFrame columns. References ---------- .. [1] Smith, J. & Doe, A. "Advanced Filtering Techniques", Journal of Data Science, 2022. .. [2] Brown, K. "Data Visualization Best Practices", Data Analytics Press, 2021. """ # 1) If z, x, y are strings, extract from DataFrame if data is not None: z, x, y = extract_array_from( data, z, x, y, handle_unknown='raise', # raise if columns not found error='raise', check_size=True, # ensure x,y,z have same length ) # 2) Ensure z, x, y are arrays z = np.asarray(z) if not isinstance(z, np.ndarray) else z x = np.asarray(x) if (x is not None and not isinstance(x, np.ndarray)) else x y = np.asarray(y) if (y is not None and not isinstance(y, np.ndarray)) else y # 3) If threshold='auto', compute it using percentiles # and decide condition based on use_negative_criteria if isinstance(threshold, str) and threshold.lower() == 'auto': if use_negative_criteria: thr_value = np.percentile(z, percentile) # e.g., lower percentile condition = 'below' else: thr_value = np.percentile(z, 100 - percentile) condition = 'above' threshold = thr_value # 4) Build a mask depending on single-value or range threshold if isinstance(threshold, (tuple, list)) and len(threshold) == 2: threshold = validate_length_range(threshold, param_name ='Threshold') if condition == 'auto': condition = 'between' # default if user didn't specify if condition.lower() == 'between': mask = (z >= threshold[0]) & (z <= threshold[1]) else: raise ValueError( "For tuple/list threshold, specify condition='between'" " or handle logic manually." ) else: # Single numeric threshold if condition == 'auto': condition = 'below' if use_negative_criteria else 'above' if condition.lower() == 'above': mask = (z > threshold) elif condition.lower() == 'below': mask = (z < threshold) else: raise ValueError( f"condition={condition} not valid for a single numeric threshold." ) # 5) Filter the arrays z_filtered = z[mask] x_filtered = x[mask] if x is not None else None y_filtered = y[mask] if y is not None else None # 6) Prepare the output # If x & y exist, return a DataFrame with columns [x, y, z]. # If only x, return [x, z]. # Otherwise, return just the filtered z as a Series. x_name = resolve_label(x, default_name ='x') y_name = resolve_label(y, default_name ='y') z_name = resolve_label(z, default_name ='z') if x_filtered is not None and y_filtered is not None: result = pd.DataFrame({x_name: x_filtered, y_name: y_filtered, z_name: z_filtered}) elif x_filtered is not None: result = pd.DataFrame({x_name: x_filtered, z_name: z_filtered}) else: result = pd.Series(z_filtered) # 7) If view=True, build a plot according to plot_type and data availability if view: plt.figure(figsize=figsize) # - If x & y provided, do a 2D scatter of x vs y (colored by z or sized by z). # - If only x provided, do 1D plot with z as the values. # - If neither x nor y, do a histogram of z. if x_filtered is not None and y_filtered is not None: if plot_type == 'scatter': plt.scatter(x_filtered, y_filtered, c=z_filtered, **kwargs) plt.colorbar(label=z_name) plt.xlabel(x_name) plt.ylabel(y_name) plt.title('Filtered Scatter Plot') elif plot_type == 'line': # Plot lines in x-y plane, ignoring z plt.plot(x_filtered, y_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(y_name) plt.title('Filtered Line Plot') else: # fallback plt.scatter(x_filtered, y_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(y_name) plt.title(f'Fallback to Scatter: {plot_type}') elif x_filtered is not None: # only x is given if plot_type == 'scatter': plt.scatter(x_filtered, z_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(z_name) plt.title('Filtered Scatter (x vs z)') elif plot_type == 'line': plt.plot(x_filtered, z_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(z_name) plt.title('Filtered Line (x vs z)') elif plot_type == 'hist': plt.hist(z_filtered, **kwargs) plt.xlabel(z_name) plt.title('Filtered Histogram (z)') else: # fallback plt.scatter(x_filtered, z_filtered, **kwargs) plt.xlabel(x_name) plt.ylabel(z_name) plt.title(f'Fallback to Scatter: {plot_type}') else: # No x or y => just do a histogram of z plt.hist(z_filtered, **kwargs) plt.xlabel(z_name) plt.title('Filtered Histogram (z)') if axis_off : plt.axis('off') if not show_grid: plt.grid(False ) plt.tight_layout() plt.show() return result @SaveFile def dual_merge( df1: pd.DataFrame, df2: pd.DataFrame, feature_cols: Union[list, tuple] = ('longitude', 'latitude'), find_closest: bool = False, force_coords: bool = False, threshold: float = 0.01, how: str = 'inner', savefile: Optional[str]=None, ) -> pd.DataFrame: """ Merge two DataFrames based on specified feature columns. The function can match the features exactly or find the closest matches within a specified distance threshold. It also allows for overwriting coordinates or feature values from one DataFrame to another when a close match is found. Parameters ---------- df1 : pd.DataFrame The first DataFrame to be merged. It contains the primary data along with the feature columns (e.g., longitude, latitude) to be merged on. df2 : pd.DataFrame The second DataFrame to be merged. It contains the data to be matched with `df1` based on the specified feature columns. feature_cols : tuple or list, default ``('longitude', 'latitude')`` The names of the columns in each DataFrame to merge on. It should contain two columns representing features, such as coordinates (longitude, latitude) or other relevant attributes. These columns will be used to match the rows from `df1` to `df2`. find_closest : bool, default ``False`` If ``True``, the function will attempt to find the closest points in ``df2`` for each point in ``df1`` within the specified distance threshold (`threshold`). If no exact match is found, the closest point within the threshold will be considered. force_coords : bool, default ``False`` If ``True``, when the closest points are found, the coordinates of `df1` will overwrite those of `df2` for the matched points. threshold : float, default ``0.01`` The maximum distance threshold within which points will be considered as "close" for the closest point matching. The value is in the same unit as the feature columns (e.g., degrees for latitude/longitude). how : str, default ``'inner'`` The type of merge to perform. Options include: - ``'inner'``: Only includes points that appear in both DataFrames. - ``'left'``: All points from `df1` are included; unmatched points from `df2` are excluded. - ``'right'``: All points from `df2` are included; unmatched points from `df1` are excluded. - ``'outer'``: Includes all points from both DataFrames, with NaN for unmatched points. Returns ------- pd.DataFrame A DataFrame containing the merged data based on the specified feature columns and merge type. If ``find_closest=True``, it will contain the closest matches within the specified threshold, with coordinates from ``df1`` overwritten if ``force_coordinates=True``. Notes ----- - This function uses a KDTree for efficient nearest-neighbor searching when ``find_closest=True``. This is useful when dealing with large datasets that may not have exact coordinate matches. - When ``force_coordinates=True``, the coordinates from ``df1`` will overwrite those from ``df2`` for the closest points. However, other feature values will be kept from ``df2``. Examples -------- >>> import pandas as pd >>> from fusionlab.utils.datautils import dual_merge >>> df1 = pd.DataFrame({ >>> 'longitude': [1.1, 1.2, 1.3], >>> 'latitude': [2.1, 2.2, 2.3], >>> 'value1': [10, 20, 30] >>> }) >>> df2 = pd.DataFrame({ >>> 'longitude': [1.1, 1.4], >>> 'latitude': [2.1, 2.4], >>> 'value2': [100, 200] >>> }) >>> result = dual_merge(df1, df2, feature_cols=('longitude', 'latitude'), >>> find_closest=True, threshold=0.05) >>> print(result) longitude latitude value1 value2 0 1.1 2.1 10 100 See Also -------- scipy.spatial.cKDTree: Used for finding the closest points in `df2` when ``find_closest=True``. pandas.merge: pandas.DataFrame.merge The pandas merge function, used to merge DataFrames based on columns. References ---------- .. [1] Scipy Documentation, cKDTree https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.cKDTree.html """ # Ensure feature_cols are valid and both contain two elements feature_cols = columns_manager(feature_cols , empty_as_none= False ) if len(feature_cols) != 2: raise ValueError( "feature_cols must contain exactly two features (e.g., longitude, latitude)") # check wether df1 and df2 are both dataframes. are_all_frames_valid(df1, df2 ) # Extract columns from feature_cols for both DataFrames feature1_1, feature1_2 = feature_cols feature2_1, feature2_2 = feature_cols # Filter for relevant columns in both DataFrames df1_coords = df1[[feature1_1, feature1_2]] df2_coords = df2[[feature2_1, feature2_2]] if find_closest: # Use KDTree for fast nearest-neighbor search tree = cKDTree(df2_coords.values) # Query for the closest points in df2 for each point in df1 dist, indices = tree.query( df1_coords.values, distance_upper_bound=threshold) # Filter out points that couldn't find a close match valid_idx = dist != np.inf df1_coords_closest = df1_coords.iloc[valid_idx] df2_coords_closest = df2_coords.iloc[indices[valid_idx]] if force_coords: # Force coordinates of df1 to overwrite df2 df2_coords_closest[feature2_1] = df1_coords_closest[feature1_1].values df2_coords_closest[feature2_2] = df1_coords_closest[feature1_2].values # Update df1 with the closest matches from df2 df1 = df1.iloc[valid_idx] df2 = df2.iloc[indices[valid_idx]] # Perform the merge based on the feature columns merged_data = pd.merge( df1, df2, how=how, left_on=[feature1_1, feature1_2], right_on=[feature2_1, feature2_2] ) return merged_data @isdf def extract_coordinates( df: pd.DataFrame, as_frame: bool = False, drop_xy: bool = False, error: Union[bool, str] = 'raise', verbose: int = 0 ) -> Tuple[Union[Tuple[float, float], pd.DataFrame, None], pd.DataFrame, Tuple[str, str]]: """ Identifies coordinate columns (longitude/latitude or easting/northing) in a DataFrame, returns the coordinates or their central values, and optionally removes the coordinate columns from the DataFrame. Parameters ---------- df : pd.DataFrame The DataFrame expected to contain coordinates (`longitude` and `latitude` or `easting` and `northing`). If both types are present, `longitude` and `latitude` are prioritized. as_frame : bool, default=False If True, returns the coordinate columns as a DataFrame. If False, computes and returns the midpoint values. drop_xy : bool, default=False If True, removes coordinate columns (`longitude`/`latitude` or `easting`/`northing`) from the DataFrame after extracting them. error : Union[bool, str], {'raise', 'warn', 'ignore'} default='raise' If True, raises an error if `df` is not a DataFrame. If set to False, converts errors to warnings. If set to ``"ignore"``, suppresses warnings. verbose : int, default=0 If greater than 0, outputs messages about coordinate detection. Returns ------- Tuple[Union[Tuple[float, float], pd.DataFrame, None], pd.DataFrame, Tuple[str, str]] - A tuple containing either the midpoint (longitude, latitude) or (easting, northing) if `as_frame=False` or the coordinate columns as a DataFrame if `as_frame=True`. - The original DataFrame, optionally with coordinates removed if `drop_xy=True`. - A tuple of detected coordinate column names, or an empty tuple if none are detected. Notes ----- - This function searches for either `longitude`/`latitude` or `easting`/`northing` columns and returns them as coordinates. If both are found, `longitude`/`latitude` is prioritized. - To calculate the midpoint of the coordinates, the function averages the values in the columns: .. math:: \text{midpoint} = \left(\frac{\text{longitude}_{min} + \text{longitude}_{max}}{2}, \frac{\text{latitude}_{min} + \text{latitude}_{max}}{2}\right) Examples -------- >>> import gofast as gf >>> from fusionlab.utils.spatial_utils import extract_coordinates >>> testdata = gf.datasets.make_erp(samples=7, seed=42, as_frame=True) # Extract midpoint coordinates >>> xy, d, xynames = extract_coordinates(testdata) >>> xy, xynames ((110.48627946874444, 26.051952363176344), ('longitude', 'latitude')) # Extract coordinates as a DataFrame without removing columns >>> xy, d, xynames = extract_coordinates(testdata, as_frame=True) >>> xy.head(2) longitude latitude 0 110.485833 26.051389 1 110.485982 26.051577 # Drop coordinate columns from the DataFrame >>> xy, d, xynames = extract_coordinates(testdata, drop_xy=True) >>> xy, xynames ((110.48627946874444, 26.051952363176344), ('longitude', 'latitude')) >>> d.head(2) station resistivity 0 0.0 1.0 1 20.0 167.5 References ---------- .. [1] Fotheringham, A. Stewart, *Geographically Weighted Regression: The Analysis of Spatially Varying Relationships*, Wiley, 2002. See Also -------- pd.DataFrame : Main pandas data structure for handling tabular data. np.nanmean : Computes the mean along specified axis, ignoring NaNs. """ def rename_if_exists(val: str, col: pd.Index, default: str) -> pd.DataFrame: """Rename column in `d` if `val` is found in column names.""" match = list(filter(lambda x: val in x.lower(), col)) if match: df.rename(columns={match[0]: default}, inplace=True) return df # Validate input is a DataFrame if not (hasattr(df, 'columns') and hasattr(df, '__array__')): emsg = ("Expected a DataFrame containing coordinates (`longitude`/" "`latitude` or `easting`/`northing`). Got type: " f"{type(df).__name__!r}") error = str(error).lower().strip() if error == 'raise': raise TypeError(emsg) if error =='warn': warnings.warn(emsg) return None, df, () # Rename columns to standardized names if they contain coordinate values for name, std_name in zip(['lat', 'lon', 'east', 'north'], ['latitude', 'longitude', 'easting', 'northing']): df = rename_if_exists(name, df.columns, std_name) # Check for and prioritize coordinate columns coord_columns = [] for x, y in [('longitude', 'latitude'), ('easting', 'northing')]: if x in df.columns and y in df.columns: coord_columns = [x, y] break # Extract coordinates as DataFrame or midpoint if coord_columns: xy = df[coord_columns] if as_frame else tuple( np.nanmean(df[coord_columns].values, axis=0)) else: xy = None # Drop coordinates if `drop_xy=True` if drop_xy and coord_columns: df.drop(columns=coord_columns, inplace=True) # Verbose messaging if verbose > 0: print("###", "No" if not coord_columns else coord_columns, "coordinates found.") return xy, df, tuple(coord_columns) @Deprecated(reason=( "This function is deprecated and will be removed in future versions. " "Please use `extract_coordinates` instead, which provides enhanced " "flexibility and robustness for coordinates extraction.") ) @isdf def get_xy_coordinates( df, as_frame=False, drop_xy=False, raise_exception=True, verbose=0 ): """Check whether the coordinate values x, y exist in the data. Parameters ------------ df: Dataframe Frame that is expected to contain the longitude/latitude or easting/northing coordinates. Note if all types of coordinates are included in the data frame, the longitude/latitude takes the priority. as_frame: bool, default= False, Returns the coordinates values if included in the data as a frame rather than computing the middle points of the line drop_xy: bool, default=False, Drop the coordinates in the data and return the data transformed inplace raise_exception: bool, default=True raise error message if data is not a dataframe. If set to ``False``, exception is converted to a warning instead. To mute the warning set `raise_exception` to ``mute`` verbose: int, default=0 Send message whether coordinates are detected. returns -------- xy, d, xynames: Tuple xy : tuple of float ( longitude, latitude) or (easting/northing ) if `as_frame` is set to ``True``. d: Dataframe transformed (coordinated removed ) or not xynames: str, the name of coordinates detected. Examples ---------- >>> import gofast as gf >>> from fusionlab.utils.spatial_utils import get_xy_coordinates >>> testdata = gf.datasets.make_erp ( samples =7, seed =42 , as_frame=True) >>> xy, d, xynames = get_xy_coordinates ( testdata, ) >>> xy , xynames ((110.48627946874444, 26.051952363176344), ('longitude', 'latitude')) >>> xy, d, xynames = get_xy_coordinates ( testdata, as_frame =True ) >>> xy.head(2) longitude latitude easting northing 0 110.485833 26.051389 448565.380621 2.881476e+06 1 110.485982 26.051577 448580.339199 2.881497e+06 >>> # remove longitude and lat in data >>> testdata = testdata.drop (columns =['longitude', 'latitude']) >>> xy, d, xynames = get_xy_coordinates ( testdata, as_frame =True ) >>> xy.head(2) easting northing 0 448565.380621 2.881476e+06 1 448580.339199 2.881497e+06 >>> # note testdata should be transformed inplace when drop_xy is set to True >>> xy, d, xynames = get_xy_coordinates ( testdata, drop_xy =True) >>> xy, xynames ((448610.25612032827, 2881538.4380570543), ('easting', 'northing')) >>> d.head(2) station resistivity 0 0.0 1.0 1 20.0 167.5 >>> testdata.head(2) # coordinates are henceforth been dropped station resistivity 0 0.0 1.0 1 20.0 167.5 >>> xy, d, xynames = get_xy_coordinates ( testdata, drop_xy =True) >>> xy, xynames (None, ()) >>> d.head(2) station resistivity 0 0.0 1.0 1 20.0 167.5 """ def get_value_in ( val, col , default): """ Get the value in the frame columns if `val` exists in """ x = list( filter ( lambda x: x.find (val)>=0 , col) ) if len(x) !=0: # now rename col df.rename (columns = {x[0]: str(default) }, inplace = True ) return df if not ( hasattr ( df, 'columns') and hasattr ( df, '__array__') ) : emsg = ("Expect dataframe containing coordinates longitude/latitude" f" or easting/northing. Got {type (df).__name__!r}") raise_exception = str(raise_exception).lower().strip() if raise_exception=='true': raise TypeError ( emsg ) if raise_exception not in ('mute', 'silence'): warnings.warn( emsg ) return df # check whether coordinates exists in the data columns for name, tname in zip ( ('lat', 'lon', 'east', 'north'), ( 'latitude', 'longitude', 'easting', 'northing') ) : df = get_value_in(name, col = df.columns , default = tname ) # get the exist coodinates coord_columns = [] for x, y in zip ( ( 'longitude', 'easting' ), ( 'latitude', 'northing')): if ( x in df.columns and y in df.columns ): coord_columns.extend ( [x, y] ) xy = df[ coord_columns] if len(coord_columns)!=0 else None if ( not as_frame and xy is not None ) : # take the middle of the line and if both types of # coordinates are supplied , take longitude and latitude # and drop easting and northing xy = tuple ( np.nanmean ( np.array ( xy ) , axis =0 )) [:2] xynames = tuple ( coord_columns)[:2] if ( drop_xy and len( coord_columns) !=0 ): # modifie the data inplace df.drop ( columns=coord_columns, inplace = True ) if verbose: print("###", "No" if len(xynames)==0 else ( tuple (xy.columns) if as_frame else xy), "coordinates found.") return xy , df , xynames @is_data_readable @validate_params ({ 'data': ['array-like'], 'method': [StrOptions({"abs", "absolute", "relative"}), None], }) @isdf def batch_spatial_sampling( data, sample_size=0.1, n_batches=10, stratify_by=None, spatial_bins=10, spatial_cols=None, method="abs", min_relative_ratio=.01, random_state=42, verbose=1, ): """ Batch resample spatial data with stratification over spatial and specified columns. This function divides the dataset into `n_batches` batches, each being a stratified sample of the data. It ensures that samples in the first batch are not present in subsequent batches, and so on. This is particularly useful when dealing with very large datasets that cannot be processed at once, allowing for batch processing in machine learning algorithms. Parameters ---------- data : pandas.DataFrame The input DataFrame from which samples are to be drawn. It must contain the spatial coordinate columns specified in `spatial_cols`, and any additional columns specified in `stratify_by`. sample_size : float or int, optional The total number of samples to draw from `data`. If `sample_size` is a float between 0.0 and 1.0, it represents the fraction of the dataset to include in the sample (e.g., `sample_size=0.1` selects 10% of the data). If `sample_size` is an integer, it represents the absolute number of samples to select. The default is ``0.1``. n_batches : int, optional The number of batches to divide the total samples into. The samples are divided as evenly as possible among the batches. The default is ``10``. stratify_by : str, list of str, optional A list of column names in `data` to use for stratification. The sampling will ensure that the distribution of these columns in each batch matches the distribution in the original dataset. spatial_bins : int or tuple/list of int, optional The number of bins to divide the spatial coordinates into for stratification. If an integer, the same number of bins is used for all spatial dimensions. If a tuple or list, its length must match the number of spatial columns specified in `spatial_cols`, and each element specifies the number of bins for that spatial dimension. The default is ``10``. spatial_cols : list or tuple of str, optional A list of column names in `data` representing spatial coordinates. The function can accept one or two columns (e.g., longitude and latitude). If ``None``, the function will look for columns named `'longitude'` and/or `'latitude'` in `data`. If only one spatial column is provided or found, a warning is issued, suggesting that providing both spatial columns is recommended for more accurate sampling. If more than two columns are provided, an error is raised. method : str, {'abs', 'relative'}, default='abs' Defines how the sample size is determined: - ``'abs'`` or ``'absolute'``: Uses a **fixed** sampling proportion based on `sample_size`. - ``'relative'``: Dynamically **scales** sampling based on dataset stratification, ensuring that all stratification groups receive a proportional sample while maintaining a minimum sampling ratio (controlled by `min_relative_ratio`). When ``method='relative'``, the function ensures that even small stratification groups receive a sufficient sample by applying `min_relative_ratio`. min_relative_ratio : float, default=0.01 Controls the **minimum allowable fraction** of records that must be sampled when ``method='relative'``. - Ensures that no group is **undersampled** to zero, even if its natural proportion in the dataset is very small. - Must be a value between ``0`` and ``1``. - The default value (``0.01``) means that at **least 1% of the total dataset** will be sampled from each stratification group, regardless of its relative size. **Example Scenarios:** - If `min_relative_ratio=0.05`, then each group **must** contribute **at least 5%** of the total dataset size (if possible). - If a group is too small to reach this minimum, its entire subset is sampled instead. - This ensures that no group receives **less than ``min_relative_ratio × total samples**``. random_state : int, optional Controls the randomness of the sampling for reproducibility. This integer seed is used to initialize the random number generator. The default is ``42``. verbose: bool, default=False, If `True`, displays a progress bar and detailed status messages during execution. Useful for monitoring the process, especially when working with large datasets. Returns ------- batches : list of pandas.DataFrame A list of DataFrames, each representing a batch of the stratified sampled data. Notes ----- The function performs stratified sampling based on spatial bins and other specified stratification columns. Spatial coordinates are binned using quantile-based discretization (:func:`pandas.qcut`), ensuring each bin has approximately the same number of observations. The total number of samples, :math:`n`, is divided among the batches, and within each batch, samples are drawn in a stratified manner. The sample size for each batch is calculated as: .. math:: n_{\text{batch}} = \left\lfloor \frac{n}{n_{\text{batches}}} \right\rfloor The remaining samples are distributed among the first few batches: .. math:: n_{\text{leftover}} = n \mod n_{\text{batches}} For each batch, the number of samples per stratification group is calculated based on the proportion of the group size to the remaining data size: .. math:: n_{i} = \left\lceil \frac{N_{i}}{N_{\text{remaining}}}\\ \times n_{\text{batch}} \right\rceil where: - :math:`N_{i}` is the size of group :math:`i`. - :math:`N_{\text{remaining}}` is the total number of samples remaining in the data. - :math:`n_{i}` is the number of samples to draw from group :math:`i`. After sampling, the selected samples are removed from the remaining data to ensure that they are not selected again in subsequent batches. Examples -------- Examples -------- **Case 1: Stratified Sampling (Using 'year' and 'geological_category')** >>> from fusionlab.utils.spatial_utils import batch_spatial_sampling >>> import pandas as pd >>> import numpy as np >>> # Create a sample dataset >>> np.random.seed(42) >>> df = pd.DataFrame({ ... "id": np.arange(10_000), ... "longitude": np.random.uniform(-180, 180, 10_000), # Geographic range ... "latitude": np.random.uniform(-90, 90, 10_000), # Geographic range ... "year": np.random.randint(1990, 2025, 10_000), # Temporal feature ... "geological_category": np.random.choice( ... ["Sedimentary", "Metamorphic", "Igneous"], 10_000 ... ), ... "value": np.random.randn(10_000) # Random numerical data ... }) >>> # Perform stratified batch sampling >>> sampled_batches = batch_spatial_sampling( ... data=df, ... sample_size=0.05, # 5% of total data ... n_batches=5, ... stratify_by=['year', 'geological_category'], # Stratify by year & geology type ... spatial_bins=(10, 15), ... spatial_cols=['longitude', 'latitude'], ... random_state=42 ... ) >>> for i, batch in enumerate(sampled_batches): ... print(f"Batch {i+1}: {batch.shape}") Creating 5 stratified batches with a total of 500 samples... Batch Sampling Progress: 100%|██████████| 5/5 [00:01<00:00, 4.43it/s] Batch sampling completed. 5 batches created. **Stratified Sampling Results:** Batch 1: (100, 6) Batch 2: (100, 6) Batch 3: (100, 6) Batch 4: (100, 6) Batch 5: (100, 6) **Case 2: Random Sampling (Without Stratification)** >>> sampled_batches = batch_spatial_sampling( ... data=df, ... sample_size=0.05, ... n_batches=5, ... stratify_by=None, # No stratification ... spatial_bins=(10, 15), ... spatial_cols=['longitude', 'latitude'], ... random_state=42 ... ) >>> for i, batch in enumerate(sampled_batches): ... print(f"Batch {i+1}: {batch.shape}") Creating 5 random batches with a total of 500 samples... Batch Sampling Progress: 100%|██████████| 5/5 [00:00<00:00, 247.27it/s] Batch sampling completed. 5 batches created. **Random Sampling Results:** Batch 1: (100, 6) Batch 2: (100, 6) Batch 3: (100, 6) Batch 4: (100, 6) Batch 5: (100, 6) See Also -------- spatial_sampling : Perform stratified sampling without batching. References ---------- .. [1] Kotsiantis, S., Kanellopoulos, D., & Pintelas, P. (2006). "Data preprocessing for supervised learning." *International Journal of Computer Science*, 1(2), 111-117. """ data = data.copy() total_samples = sample_size if isinstance(sample_size, float): if not 0 < sample_size < 1: raise ValueError("When sample_size is a float, it must be between 0 and 1.") total_samples = int(len(data) * sample_size) elif isinstance(sample_size, int): if sample_size <= 0: raise ValueError("sample_size must be positive.") else: raise ValueError("sample_size must be a float or int.") if total_samples > len(data): raise ValueError("sample_size is larger than the dataset.") if n_batches <= 0: raise ValueError("n_batches must be a positive integer.") sample_size_per_batch = total_samples // n_batches leftover = total_samples % n_batches batches = [] remaining_data = data.copy() sampled_indices = set() rng = np.random.RandomState(random_state) # Set default spatial columns if not specified spatial_cols = columns_manager(spatial_cols) if spatial_cols is None: spatial_cols = [] if 'longitude' in data.columns: spatial_cols.append('longitude') if 'latitude' in data.columns: spatial_cols.append('latitude') if not spatial_cols: raise ValueError( "No spatial columns specified and " "'longitude' and 'latitude' not found in data." ) if len(spatial_cols) == 1: warnings.warn( f"Only one spatial column '{spatial_cols[0]}' found. " "Using it for spatial stratification. " "For more accurate sampling, providing both spatial " "columns is recommended.", UserWarning ) else: if not isinstance(spatial_cols, (list, tuple)): raise ValueError( "spatial_cols must be a list or tuple of column names." ) if len(spatial_cols) > 2: raise ValueError( "spatial_cols can have at most two columns." ) for col in spatial_cols: if col not in data.columns: raise ValueError( f"Spatial column '{col}' not found in data." ) if len(spatial_cols) == 1: warnings.warn( f"Only one spatial column '{spatial_cols[0]}' specified. " "For more accurate sampling, providing two spatial columns " "is recommended.", UserWarning ) # Validate spatial_bins if isinstance(spatial_bins, int): n_bins_list = [spatial_bins] * len(spatial_cols) elif isinstance(spatial_bins, (tuple, list)): if len(spatial_bins) != len(spatial_cols): raise ValueError( "Length of spatial_bins must match number of spatial_cols." ) n_bins_list = list(spatial_bins) else: raise ValueError( "spatial_bins must be int or tuple/list of int." ) # Create spatial bins in the original data for col, n_bins, axis in zip( spatial_cols, n_bins_list, ['x_bin', 'y_bin'] ): data[axis] = pd.qcut( data[col], q=n_bins, duplicates='drop' ) # Create combined stratification key in original data # Create combined stratification key if stratify_by is not None: strat_columns = stratify_by + [ axis for axis in ['x_bin', 'y_bin'][:len(spatial_cols)] ] if verbose and len(data) > 10_000: print(f"\nGenerating stratification keys for {len(data):,}" " records...") print(" This may take some time. Please be patient...") # Optimized vectorized concatenation data['strat_key'] = data[strat_columns].astype(str).agg('_'.join, axis=1) if verbose and len(data) > 10_000: print("Stratification keys generated" f" successfully for {len(data):,} records.") else: data['strat_key'] = 'all_data' # Single group for random sampling # Initialize remaining data remaining_data = data.copy() batches = [] # Set initial random state rng = np.random.RandomState(random_state) if verbose: print(f"\nCreating {n_batches} stratified batches with" f" a total of {total_samples:,} samples...") # TQDM progress bar if verbose and HAS_TQDM: progbar = tqdm( range(n_batches), total=n_batches, ascii=True, ncols=80, desc="Batch Sampling Progress" ) if method=="relative": min_relative_ratio= assert_ratio( min_relative_ratio, bounds=(0, 1), excludes = (0, 1), name="`min_relative_ratio`" ) for batch_idx in range(n_batches): # Adjust sample size for batches if total_samples # is not divisible by n_batches if batch_idx < leftover: batch_sample_size = sample_size_per_batch + 1 else: batch_sample_size = sample_size_per_batch # if batch_sample_size > len(remaining_data): # batch_sample_size = len(remaining_data) batch_sample_size = min(batch_sample_size, len(remaining_data)) # Group remaining data by stratification key grouped = remaining_data.groupby('strat_key') # Calculate number of samples per group group_sizes = grouped.size() total_size = group_sizes.sum() if method in ["abs", "absolute"]: group_sample_sizes = ( (group_sizes / total_size * batch_sample_size) .round() .astype(int) ) else: # "relative" relative_scale = np.clip(batch_sample_size / len( remaining_data), min_relative_ratio, 1) group_sample_sizes = ( (group_sizes * relative_scale) .round() .astype(int) ) # Sample data from each group sampled_indices = [] for strat_value, group in grouped: n = group_sample_sizes.loc[strat_value] if n > 0 and len(group) > 0: sampled_group = group.sample( n=min(n, len(group)), random_state=rng.randint(0, 10000) ) sampled_indices.extend(sampled_group.index) # Create the sampled DataFrame batch_sampled_data = remaining_data.loc[sampled_indices] batches.append(batch_sampled_data.drop( columns=['strat_key'] + [axis for axis in [ 'x_bin', 'y_bin'][:len(spatial_cols)]])) # Remove sampled data from remaining_data remaining_data = remaining_data.drop(index=sampled_indices) if len(remaining_data) == 0: break # No more data to sample if verbose and HAS_TQDM: progbar.update(1) if verbose and HAS_TQDM: progbar.close() if verbose: print(f"\nBatch sampling completed. {len(batches)} batches created.") has_empty_batches = any([ b.empty for b in batches]) if verbose and has_empty_batches: warnings.warn( "\nNo records were sampled. This is likely due to" " insufficient data for the specified stratification" f" columns {stratify_by}. To resolve this, consider:\n" " • Using a different stratification method.\n" " • Increasing the dataset size to include more" " representative data.\n" " • Adjusting the sample size to ensure sufficient" " records per group.\n" " • Or, setting `stratify_by=None` to perform" " random sampling instead." ) return batches
[docs] @SaveFile @is_data_readable @validate_params ({ 'data': ['array-like'], 'method': [StrOptions({"abs", "absolute", "relative"}), None], }) @isdf def spatial_sampling( data, sample_size=0.01, stratify_by=None, spatial_bins=10, spatial_cols=None, method='abs', min_relative_ratio=.01, random_state=42, savefile=None, verbose=1, ): """ Sample spatial data intelligently to represent the distribution of the whole area and include different years. This function performs stratified sampling on spatial data, ensuring that the sample reflects both spatial distribution and temporal aspects of the entire dataset [1]_. It combines spatial stratification based on coordinates and additional stratification columns specified by the user. Parameters ---------- data : pandas.DataFrame The input DataFrame to sample from. Must contain spatial coordinate columns (e.g., `'longitude'`, `'latitude'`) and any columns specified in ``stratify_by``. sample_size : float or int, optional The proportion or absolute number of samples to select. If float, should be between 0.0 and 1.0 and represents the fraction of the dataset to include in the sample. If int, represents the absolute number of samples to select. Default is ``0.01`` (1% of the data). stratify_by : list of str, optional List of column names to stratify by. spatial_bins : int or tuple/list of int, optional Number of bins to divide the spatial coordinates into. If an integer, the same number of bins is used for all spatial dimensions. If a tuple or list, its length must match the number of spatial columns, specifying the number of bins for each spatial dimension. Default is ``10``. spatial_cols : list or tuple of str, optional List of spatial coordinate column names. Can accept one or two columns. If ``None``, the function checks for columns named `'longitude'` and/or `'latitude'` in ``data``. If only one spatial column is provided or found, a warning is issued, suggesting that providing both spatial columns is recommended for more accurate sampling. If more than two columns are provided, an error is raised. method : str, {'abs', 'relative'}, default='abs' Defines how the sample size is determined: - ``'abs'`` or ``'absolute'``: Uses a **fixed** sampling proportion based on `sample_size`. - ``'relative'``: Dynamically **scales** sampling based on dataset stratification, ensuring that all stratification groups receive a proportional sample while maintaining a minimum sampling ratio (controlled by `min_relative_ratio`). When ``method='relative'``, the function ensures that even small stratification groups receive a sufficient sample by applying `min_relative_ratio`. min_relative_ratio : float, default=0.01 Controls the **minimum allowable fraction** of records that must be sampled when ``method='relative'``. - Ensures that no group is **undersampled** to zero, even if its natural proportion in the dataset is very small. - Must be a value between ``0`` and ``1``. - The default value (``0.01``) means that at **least 1% of the total dataset** will be sampled from each stratification group, regardless of its relative size. **Example Scenarios:** - If `min_relative_ratio=0.05`, then each group **must** contribute **at least 5%** of the total dataset size (if possible). - If a group is too small to reach this minimum, its entire subset is sampled instead. - This ensures that no group receives **less than ``min_relative_ratio × total samples**``. random_state : int, optional Random seed for reproducibility. Default is ``42``. verbose: bool, default=False, If `True`, displays a progress bar and detailed status messages during execution. Useful for monitoring the process, especially when working with large datasets. Returns ------- sampled_data : pandas.DataFrame A sampled DataFrame representing the distribution of the whole area and including different years. Notes ----- The function performs stratified sampling based on spatial bins and other specified stratification columns. Spatial coordinates are binned using quantile-based discretization (:func:`pandas.qcut`), ensuring each bin has approximately the same number of observations. Let :math:`N` be the total number of samples in ``data``, and :math:`n` be the desired sample size. The function calculates the number of samples to draw from each stratification group based on the proportion of the group size to the total dataset size: .. math:: n_i = \left\lceil \frac{N_i}{N} \times n \right\rceil where :math:`N_i` is the size of group :math:`i`, and :math:`n_i` is the number of samples to draw from group :math:`i`. The function ensures that: - All specified spatial and stratification columns exist in ``data``. - The number of spatial bins matches the number of spatial columns. - The sample size is valid (positive float between 0 and 1, or positive integer). Warnings are issued if: - Only one spatial column is used, suggesting that using two spatial columns is recommended for more accurate sampling. Examples -------- >>> from fusionlab.utils.spatial_utils import spatial_sampling >>> import pandas as pd >>> # Assume 'df' is a pandas DataFrame with columns >>> # 'longitude', 'latitude', 'year', and other data. >>> sampled_df = spatial_sampling( ... data=df, ... sample_size=0.05, ... stratify_by=['year', 'geological_category'], ... spatial_bins=(10, 15), ... spatial_cols=['longitude', 'latitude'], ... random_state=42 ... ) >>> print(sampled_df.shape) See Also -------- pandas.qcut : Quantile-based discretization function used for binning. sklearn.model_selection.StratifiedShuffleSplit : For stratified sampling. batch_spatial_sampling: Resample spatial data with batching. References ---------- .. [1] Kotsiantis, S., Kanellopoulos, D., & Pintelas, P. (2006). "Data preprocessing for supervised learning." *International Journal of Computer Science*, 1(2), 111-117. """ data = data.copy() # Set default spatial columns if not specified spatial_cols= columns_manager(spatial_cols) if spatial_cols is None: spatial_cols = [] if 'longitude' in data.columns: spatial_cols.append('longitude') if 'latitude' in data.columns: spatial_cols.append('latitude') if not spatial_cols: raise ValueError( "No spatial columns specified and " "'longitude' and 'latitude' not found in data." ) if len(spatial_cols) == 1: warnings.warn( f"Only one spatial column '{spatial_cols[0]}' found. " "Using it for spatial stratification. " "For more accurate sampling, providing both spatial " "columns is recommended.", UserWarning ) else: if not isinstance(spatial_cols, (list, tuple)): raise ValueError( "spatial_cols must be a list or tuple of column names." ) if len(spatial_cols) > 2: raise ValueError( "spatial_cols can have at most two columns." ) for col in spatial_cols: if col not in data.columns: raise ValueError( f"Spatial column '{col}' not found in data." ) if len(spatial_cols) == 1: warnings.warn( f"Only one spatial column '{spatial_cols[0]}' specified. " "For more accurate sampling, providing two spatial columns " "is recommended.", UserWarning ) # Validate spatial_bins if isinstance(spatial_bins, int): n_bins = validate_positive_integer( spatial_bins, 'spatial_bins' ) n_bins_list = [n_bins] * len(spatial_cols) elif isinstance(spatial_bins, (tuple, list)): if len(spatial_bins) != len(spatial_cols): raise ValueError( "Length of spatial_bins must match number of spatial_cols." ) n_bins_list = [ validate_positive_integer( n, 'spatial_bins' ) for n in spatial_bins ] else: raise ValueError( "spatial_bins must be int or tuple/list of int." ) # if verbose and HAS_TQDM are True. if verbose and HAS_TQDM: progbar = tqdm( zip(spatial_cols, n_bins_list, ['x_bin', 'y_bin']), total=len(spatial_cols), ascii=True, ncols=77, desc=f"{'Creating spat. bins: ' + str(len(spatial_cols)):<20}" ) # Create spatial bins for col, n_bins, axis in zip( spatial_cols, n_bins_list, ['x_bin', 'y_bin']): data[axis] = pd.qcut( data[col], q=n_bins, duplicates='drop' ) if verbose and HAS_TQDM: progbar.update(1) if verbose and HAS_TQDM: progbar.close() stratify_by= columns_manager(stratify_by, empty_as_none=False ) # Create combined stratification key strat_columns = stratify_by + [ axis for axis in ['x_bin', 'y_bin'][:len(spatial_cols)] ] if verbose and len(data) > 10_000: print("\nGenerating stratification keys...") print(f"This may take some time for {len(data):,}" " records. Please be patient...") # data['strat_key'] = data[strat_columns].apply( # lambda row: '_'.join(row.values.astype(str)), # axis=1 # ) # Using .agg and .astype(str) for vectorized string concatenation data['strat_key'] = data[strat_columns].astype(str).agg('_'.join, axis=1) # Verbose message when done if verbose and len(data)> 10_000: print("Stratification keys generated successfully" f" for {len(data):,} records.") # Determine total number of samples if isinstance(sample_size, float): if not 0 < sample_size < 1: raise ValueError( "When sample_size is a float, it must be between 0 and 1." ) n_samples = int(len(data) * sample_size) elif isinstance(sample_size, int): n_samples = validate_positive_integer( sample_size, 'sample_size' ) else: raise ValueError( "sample_size must be a positive float or int." ) # Group data by stratification key grouped = data.groupby('strat_key') if verbose: print(f"Data grouped into {len(grouped):,} stratified bins.") # Apply stratification if stratify_by is provided if stratify_by is not None: # Calculate number of samples per group group_sizes = grouped.size() total_size = group_sizes.sum() if method in ["abs", "absolute"]: group_sample_sizes = ( (group_sizes / total_size * n_samples) .round() .astype(int) ) else: # "relative" min_relative_ratio = assert_ratio( min_relative_ratio, bounds=(0, 1), exclude_values= [0, 1], name="`min_relative_ratio`" ) relative_scale = np.clip( n_samples / len(data), min_relative_ratio, 1) group_sample_sizes = ( (group_sizes * relative_scale) .round() .astype(int) ) # Sample data from each group sampled_indices = [] np.random.seed(random_state) # Use tqdm to wrap the grouped iterator # if verbose and HAS_TQM are True. if verbose and HAS_TQDM: progbar = tqdm( grouped, total=len(grouped), ascii=True, ncols=77, desc=f"Sampling {n_samples:,} records" ) for strat_value, group in grouped: n = group_sample_sizes.loc[strat_value] if n > 0 and len(group) > 0: sampled_group = group.sample( n=min(n, len(group)), random_state=np.random.randint( 0, 10_000 ) ) sampled_indices.extend( sampled_group.index ) if verbose and HAS_TQDM: progbar.update(1) if verbose and HAS_TQDM: progbar.close() else: sampled_indices = np.random.choice( data.index, size=n_samples, replace=False ) # Create the sampled DataFrame sampled_data = data.loc[ sampled_indices ] # Drop helper columns cols_to_drop = ['strat_key'] + [ axis for axis in ['x_bin', 'y_bin'][:len(spatial_cols)] ] sampled_data = sampled_data.drop( columns=cols_to_drop ) if verbose: print(f"\nSampling completed: {len(sampled_indices):,}" " records selected.") if verbose and sampled_data.empty: warnings.warn( "\nNo records were sampled. This is likely due to" " insufficient data for the specified stratification" f" columns {stratify_by}. To resolve this, consider:\n" " • Using a different stratification method.\n" " • Increasing the dataset size to include more" " representative data.\n" " • Adjusting the sample size to ensure sufficient" " records per group.\n" " • Or, setting `stratify_by=None` to perform" " random sampling instead." ) return sampled_data.reset_index( drop=True )