Anomaly Detection Examples

This section of the gallery provides practical examples of using the anomaly detection components available in fusionlab-learn. These components can be used to build standalone anomaly detection models or be integrated into larger forecasting frameworks like XTFT to make them anomaly-aware.

We will cover:

  1. Using LSTMAutoencoderAnomaly for reconstruction-based anomaly detection.

  2. Conceptual usage of SequenceAnomalyScoreLayer for feature-based anomaly scoring.

  3. Using PredictionErrorAnomalyScore to derive anomaly scores from prediction errors.

Prerequisites

Ensure you have fusionlab-learn and its dependencies installed. For some examples, matplotlib and scikit-learn are also useful.

pip install fusionlab-learn matplotlib scikit-learn

Example 1: LSTM Autoencoder for Anomaly Detection

The LSTMAutoencoderAnomaly learns to reconstruct normal time series sequences. Anomalous sequences are expected to have higher reconstruction errors.

Workflow:

  1. Generate synthetic time series data with injected anomalies.

  2. Preprocess the data (scaling, sequence creation).

  3. Define, compile, and train the LSTMAutoencoderAnomaly model.

  4. Calculate reconstruction errors on all sequences.

  5. Identify anomalies by thresholding the reconstruction errors.

  6. Visualize the results.

Step 1.1: Imports and Setup

 1import numpy as np
 2import pandas as pd
 3import tensorflow as tf
 4import matplotlib.pyplot as plt
 5from sklearn.preprocessing import StandardScaler
 6import warnings
 7import os
 8
 9# FusionLab imports
10from fusionlab.nn.anomaly_detection import LSTMAutoencoderAnomaly
11from fusionlab.nn.utils import create_sequences # For sequence prep
12
13# Suppress warnings and TF logs
14warnings.filterwarnings('ignore')
15tf.get_logger().setLevel('ERROR')
16if hasattr(tf, 'autograph'):
17    tf.autograph.set_verbosity(0)
18
19output_dir_ad = "./anomaly_detection_gallery_output"
20os.makedirs(output_dir_ad, exist_ok=True)
21print("Libraries imported for LSTM Autoencoder example.")

Step 1.2: Generate Data with Anomalies

 1# Create a sine wave with some noise
 2time = np.arange(0, 200, 0.5)
 3signal = np.sin(time * 0.1) + np.random.normal(0, 0.1, len(time))
 4
 5# Inject anomalies
 6signal_with_anomalies = signal.copy()
 7signal_with_anomalies[50:60] += 2.5  # Spike up
 8signal_with_anomalies[150:155] -= 2.0 # Dip down
 9
10df_ad = pd.DataFrame({'Timestamp': time, 'Value': signal_with_anomalies})
11print(f"Generated data shape: {df_ad.shape}")
12
13# Visualize the data
14plt.figure(figsize=(12, 4))
15plt.plot(df_ad['Timestamp'], df_ad['Value'], label='Signal with Anomalies')
16plt.title('Synthetic Time Series with Injected Anomalies')
17plt.xlabel('Time'); plt.ylabel('Value')
18plt.legend(); plt.grid(True)
19# plt.savefig(os.path.join(output_dir_ad, "ad_data_with_anomalies.png"))
20plt.show()
LSTM Autoencoder Anomaly Detection

Step 1.3: Preprocessing and Sequence Creation

 1scaler_ad = StandardScaler()
 2df_ad['ScaledValue'] = scaler_ad.fit_transform(df_ad[['Value']])
 3
 4sequence_length_ad = 20 # Length of input sequences for autoencoder
 5X_sequences, _ = create_sequences(
 6    df_ad[['ScaledValue']],
 7    sequence_length=sequence_length_ad,
 8    target_col='ScaledValue',
 9    forecast_horizon=0,
10    drop_last=False,
11    verbose=0
12)
13y_sequences = X_sequences.copy()
14
15X_train_ad = X_sequences.reshape(
16    X_sequences.shape[0], sequence_length_ad, 1
17    ).astype(np.float32)
18y_train_ad = y_sequences.reshape(
19    y_sequences.shape[0], sequence_length_ad, 1
20    ).astype(np.float32)
21
22print(f"\nTraining sequences (X) shape: {X_train_ad.shape}")
23print(f"Target sequences (y) shape: {y_train_ad.shape}")

Step 1.4: Define, Compile, and Train LSTM Autoencoder

The LSTMAutoencoderAnomaly model is defined. We ensure that if a bidirectional encoder or a bottleneck dense layer changes the dimension of the encoder’s final state, it’s projected to match the decoder’s LSTM units before being used as an initial state.

 1latent_dim_ad = 8
 2lstm_units_ad = 16 # Keep this consistent with test that failed
 3n_features_ad = 1
 4
 5autoencoder_model = LSTMAutoencoderAnomaly(
 6    latent_dim=latent_dim_ad,
 7    lstm_units=lstm_units_ad, # Decoder LSTMs will have this many units
 8    num_encoder_layers=1,
 9    num_decoder_layers=1,
10    n_features=n_features_ad,
11    n_repeats=sequence_length_ad,
12    use_bidirectional_encoder=True, # This was True in the failing test
13    use_bottleneck_dense=False,    # This was False in the failing test
14    name="lstm_autoencoder_anomaly_detector"
15)
16
17autoencoder_model.compile(optimizer='adam', loss='mse')
18print("\nLSTM Autoencoder model compiled.")
19
20print("Training LSTM Autoencoder...")
21# Build the model with the input shape before fitting
22# This ensures all layers, including conditional ones in build, are created.
23autoencoder_model.build(input_shape=(None, sequence_length_ad, n_features_ad))
24# autoencoder_model.summary() # Optional: view model structure
25
26history_ad = autoencoder_model.fit(
27    X_train_ad, y_train_ad,
28    epochs=20,
29    batch_size=32,
30    shuffle=True,
31    verbose=0
32)
33print("Training complete.")
34if history_ad and history_ad.history.get('loss'):
35    print(f"Final training loss: {history_ad.history['loss'][-1]:.4f}")

Step 1.5: Calculate Reconstruction Errors (Anomaly Scores)

 1reconstructions_ad = autoencoder_model.predict(X_train_ad, verbose=0)
 2reconstruction_errors = autoencoder_model.compute_reconstruction_error(
 3    X_train_ad, reconstructions_ad
 4)
 5print(f"\nReconstruction errors shape: {reconstruction_errors.shape}")
 6
 7anomaly_scores_ts = np.full(len(df_ad), np.nan)
 8for i, error_val in enumerate(reconstruction_errors):
 9    if i + sequence_length_ad -1 < len(anomaly_scores_ts): # Boundary check
10        anomaly_scores_ts[i + sequence_length_ad - 1] = error_val

Step 1.6: Identify Anomalies and Visualize

 1# Filter out NaNs from reconstruction_errors before calculating percentile
 2valid_errors = reconstruction_errors[~np.isnan(reconstruction_errors)]
 3if len(valid_errors) > 0:
 4    threshold_ad = np.percentile(valid_errors, 95)
 5    print(f"Anomaly threshold (95th percentile of errors): {threshold_ad:.4f}")
 6    anomalous_indices = np.where(reconstruction_errors > threshold_ad)[0]
 7    anomalous_time_points = [
 8        idx + sequence_length_ad - 1 for idx in anomalous_indices
 9        if idx + sequence_length_ad - 1 < len(df_ad) # Boundary check
10        ]
11else:
12    print("No valid reconstruction errors to calculate threshold.")
13    threshold_ad = np.inf # Set to infinity if no errors
14    anomalous_time_points = []
15
16
17plt.figure(figsize=(12, 6))
18plt.subplot(2, 1, 1)
19plt.plot(df_ad['Timestamp'], df_ad['Value'], label='Original Signal')
20if anomalous_time_points:
21    plt.scatter(df_ad['Timestamp'].iloc[anomalous_time_points],
22                df_ad['Value'].iloc[anomalous_time_points],
23                color='red', label='Detected Anomalies', marker='o', s=50, zorder=5)
24plt.title('Signal with Detected Anomalies (LSTM Autoencoder)')
25plt.ylabel('Value'); plt.legend(); plt.grid(True)
26
27plt.subplot(2, 1, 2)
28plt.plot(df_ad['Timestamp'], anomaly_scores_ts,
29         label='Reconstruction Error (Anomaly Score)', color='orange')
30if np.isfinite(threshold_ad): # Only plot threshold if it's finite
31    plt.axhline(threshold_ad, color='red', linestyle='--',
32                label=f'Anomaly Threshold ({threshold_ad:.2f})')
33plt.title('Anomaly Scores Over Time')
34plt.xlabel('Time'); plt.ylabel('Reconstruction Error (MSE)')
35plt.legend(); plt.grid(True)
36plt.tight_layout()
37# plt.savefig(os.path.join(output_dir_ad, "ad_lstm_ae_results.png"))
38plt.show()

Example Output Plot (LSTM Autoencoder):

LSTM Autoencoder Anomaly Detection

Top: Original signal with detected anomalies. Bottom: Reconstruction error over time with the anomaly threshold.


Example 2: Using SequenceAnomalyScoreLayer (Conceptual)

The SequenceAnomalyScoreLayer is designed to be integrated into a larger model. It takes learned features (e.g., from an encoder or attention layers) as input and outputs a scalar anomaly score. Training this layer requires a custom setup with an appropriate loss function, not shown in this isolated example.

Step 2.1: Imports and Setup

1import tensorflow as tf
2from fusionlab.nn.anomaly_detection import SequenceAnomalyScoreLayer
3print("\nLibraries imported for SequenceAnomalyScoreLayer example.")

Step 2.2: Instantiate and Use the Layer

 1# Assume 'learned_features' is the output of a preceding layer
 2# (e.g., aggregated output of XTFT's attention/LSTM blocks)
 3# Shape: (Batch, FeatureDim)
 4batch_size_sas = 16
 5feature_dim_sas = 64
 6learned_features_sas = tf.random.normal(
 7    (batch_size_sas, feature_dim_sas), dtype=tf.float32
 8    )
 9
10# Instantiate the scoring layer
11anomaly_scorer_layer = SequenceAnomalyScoreLayer(
12    hidden_units=32, # Hidden units in the scorer's internal MLP
13    activation='relu',
14    dropout_rate=0.1,
15    final_activation='linear' # Output an unbounded score
16)
17
18# Pass features through the layer to get scores
19# (This is typically done within a main model's call method)
20anomaly_scores_output = anomaly_scorer_layer(
21    learned_features_sas, training=False
22    )
23
24print(f"\nInput features shape: {learned_features_sas.shape}")
25print(f"Output anomaly scores shape: {anomaly_scores_output.shape}")
26# Expected output: (Batch, 1) -> (16, 1)

Note

The SequenceAnomalyScoreLayer needs to be trained as part of a larger model. The loss function would guide what these scores represent (e.g., using anomaly labels if available, or incorporating it into an unsupervised objective). This example only shows the forward pass. Refer to the XTFT ‘feature_based’ strategy discussion in the User Guide for conceptual integration.


Example 3: Using PredictionErrorAnomalyScore

The PredictionErrorAnomalyScore layer calculates an anomaly score based directly on the difference (error) between true values and a model’s predicted values for a sequence.

Step 3.1: Imports and Setup

1import tensorflow as tf
2import numpy as np # For a more visible error injection
3from fusionlab.nn.anomaly_detection import PredictionErrorAnomalyScore
4print("\nLibraries imported for PredictionErrorAnomalyScore example.")

Step 3.2: Instantiate and Use the Layer

 1# Config
 2batch_size_peas = 4
 3time_steps_peas = 10
 4features_peas = 1 # Univariate example
 5
 6# Dummy true values (e.g., from a dataset)
 7y_true_peas = tf.random.normal(
 8    (batch_size_peas, time_steps_peas, features_peas), dtype=tf.float32
 9    )
10# Dummy predicted values (e.g., from a forecasting model)
11# Add some noise to simulate prediction errors
12y_pred_peas_np = y_true_peas.numpy() + np.random.normal(
13    scale=0.6, size=y_true_peas.shape
14    ).astype(np.float32)
15# Inject a larger error for one sample to see difference in 'max' aggregation
16y_pred_peas_np[1, 5, 0] += 5.0 # Add large error to sample 1, step 5
17y_pred_peas = tf.constant(y_pred_peas_np)
18
19
20# --- Instantiate with MAE and Mean Aggregation ---
21error_scorer_mean = PredictionErrorAnomalyScore(
22    error_metric='mae',     # Use Mean Absolute Error
23    aggregation='mean'    # Average errors across time steps
24)
25# Calculate scores (average error per sequence)
26anomaly_scores_mean = error_scorer_mean([y_true_peas, y_pred_peas])
27
28# --- Instantiate with MAE and Max Aggregation ---
29error_scorer_max = PredictionErrorAnomalyScore(
30    error_metric='mae',     # Use Mean Absolute Error
31    aggregation='max'     # Take max error across time steps
32)
33# Calculate scores (max error per sequence)
34anomaly_scores_max = error_scorer_max([y_true_peas, y_pred_peas])
35
36print(f"\nInput y_true shape: {y_true_peas.shape}")
37print(f"Input y_pred shape: {y_pred_peas.shape}")
38print("\n--- MAE + Mean Aggregation ---")
39print(f"Output anomaly scores shape: {anomaly_scores_mean.shape}")
40print(f"Example Scores (Mean Error per sequence): \n"
41      f"{anomaly_scores_mean.numpy().flatten()}")
42print("\n--- MAE + Max Aggregation ---")
43print(f"Output anomaly scores shape: {anomaly_scores_max.shape}")
44print(f"Example Scores (Max Error per sequence): \n"
45      f"{anomaly_scores_max.numpy().flatten()}")
46# Expected output shapes for scores: (Batch, 1) -> (4, 1)

Note

The scores from PredictionErrorAnomalyScore can be used to construct a loss term that penalizes large prediction deviations, aligning with the ‘prediction_based’ anomaly detection strategy in XTFT.