Anomaly Detection Examples¶
This section of the gallery provides practical examples of using the
anomaly detection components available in fusionlab-learn. These
components can be used to build standalone anomaly detection models or
be integrated into larger forecasting frameworks like
XTFT to make them anomaly-aware.
We will cover:
Using
LSTMAutoencoderAnomalyfor reconstruction-based anomaly detection.Conceptual usage of
SequenceAnomalyScoreLayerfor feature-based anomaly scoring.Using
PredictionErrorAnomalyScoreto derive anomaly scores from prediction errors.
Prerequisites¶
Ensure you have fusionlab-learn and its dependencies installed.
For some examples, matplotlib and scikit-learn are also useful.
pip install fusionlab-learn matplotlib scikit-learn
Example 1: LSTM Autoencoder for Anomaly Detection¶
The LSTMAutoencoderAnomaly
learns to reconstruct normal time series sequences. Anomalous sequences
are expected to have higher reconstruction errors.
Workflow:¶
Generate synthetic time series data with injected anomalies.
Preprocess the data (scaling, sequence creation).
Define, compile, and train the LSTMAutoencoderAnomaly model.
Calculate reconstruction errors on all sequences.
Identify anomalies by thresholding the reconstruction errors.
Visualize the results.
Step 1.1: Imports and Setup¶
1import numpy as np
2import pandas as pd
3import tensorflow as tf
4import matplotlib.pyplot as plt
5from sklearn.preprocessing import StandardScaler
6import warnings
7import os
8
9# FusionLab imports
10from fusionlab.nn.anomaly_detection import LSTMAutoencoderAnomaly
11from fusionlab.nn.utils import create_sequences # For sequence prep
12
13# Suppress warnings and TF logs
14warnings.filterwarnings('ignore')
15tf.get_logger().setLevel('ERROR')
16if hasattr(tf, 'autograph'):
17 tf.autograph.set_verbosity(0)
18
19output_dir_ad = "./anomaly_detection_gallery_output"
20os.makedirs(output_dir_ad, exist_ok=True)
21print("Libraries imported for LSTM Autoencoder example.")
Step 1.2: Generate Data with Anomalies¶
1# Create a sine wave with some noise
2time = np.arange(0, 200, 0.5)
3signal = np.sin(time * 0.1) + np.random.normal(0, 0.1, len(time))
4
5# Inject anomalies
6signal_with_anomalies = signal.copy()
7signal_with_anomalies[50:60] += 2.5 # Spike up
8signal_with_anomalies[150:155] -= 2.0 # Dip down
9
10df_ad = pd.DataFrame({'Timestamp': time, 'Value': signal_with_anomalies})
11print(f"Generated data shape: {df_ad.shape}")
12
13# Visualize the data
14plt.figure(figsize=(12, 4))
15plt.plot(df_ad['Timestamp'], df_ad['Value'], label='Signal with Anomalies')
16plt.title('Synthetic Time Series with Injected Anomalies')
17plt.xlabel('Time'); plt.ylabel('Value')
18plt.legend(); plt.grid(True)
19# plt.savefig(os.path.join(output_dir_ad, "ad_data_with_anomalies.png"))
20plt.show()
Step 1.3: Preprocessing and Sequence Creation¶
1scaler_ad = StandardScaler()
2df_ad['ScaledValue'] = scaler_ad.fit_transform(df_ad[['Value']])
3
4sequence_length_ad = 20 # Length of input sequences for autoencoder
5X_sequences, _ = create_sequences(
6 df_ad[['ScaledValue']],
7 sequence_length=sequence_length_ad,
8 target_col='ScaledValue',
9 forecast_horizon=0,
10 drop_last=False,
11 verbose=0
12)
13y_sequences = X_sequences.copy()
14
15X_train_ad = X_sequences.reshape(
16 X_sequences.shape[0], sequence_length_ad, 1
17 ).astype(np.float32)
18y_train_ad = y_sequences.reshape(
19 y_sequences.shape[0], sequence_length_ad, 1
20 ).astype(np.float32)
21
22print(f"\nTraining sequences (X) shape: {X_train_ad.shape}")
23print(f"Target sequences (y) shape: {y_train_ad.shape}")
Step 1.4: Define, Compile, and Train LSTM Autoencoder¶
The LSTMAutoencoderAnomaly model is defined. We ensure that if a bidirectional encoder or a bottleneck dense layer changes the dimension of the encoder’s final state, it’s projected to match the decoder’s LSTM units before being used as an initial state.
1latent_dim_ad = 8
2lstm_units_ad = 16 # Keep this consistent with test that failed
3n_features_ad = 1
4
5autoencoder_model = LSTMAutoencoderAnomaly(
6 latent_dim=latent_dim_ad,
7 lstm_units=lstm_units_ad, # Decoder LSTMs will have this many units
8 num_encoder_layers=1,
9 num_decoder_layers=1,
10 n_features=n_features_ad,
11 n_repeats=sequence_length_ad,
12 use_bidirectional_encoder=True, # This was True in the failing test
13 use_bottleneck_dense=False, # This was False in the failing test
14 name="lstm_autoencoder_anomaly_detector"
15)
16
17autoencoder_model.compile(optimizer='adam', loss='mse')
18print("\nLSTM Autoencoder model compiled.")
19
20print("Training LSTM Autoencoder...")
21# Build the model with the input shape before fitting
22# This ensures all layers, including conditional ones in build, are created.
23autoencoder_model.build(input_shape=(None, sequence_length_ad, n_features_ad))
24# autoencoder_model.summary() # Optional: view model structure
25
26history_ad = autoencoder_model.fit(
27 X_train_ad, y_train_ad,
28 epochs=20,
29 batch_size=32,
30 shuffle=True,
31 verbose=0
32)
33print("Training complete.")
34if history_ad and history_ad.history.get('loss'):
35 print(f"Final training loss: {history_ad.history['loss'][-1]:.4f}")
Step 1.5: Calculate Reconstruction Errors (Anomaly Scores)¶
1reconstructions_ad = autoencoder_model.predict(X_train_ad, verbose=0)
2reconstruction_errors = autoencoder_model.compute_reconstruction_error(
3 X_train_ad, reconstructions_ad
4)
5print(f"\nReconstruction errors shape: {reconstruction_errors.shape}")
6
7anomaly_scores_ts = np.full(len(df_ad), np.nan)
8for i, error_val in enumerate(reconstruction_errors):
9 if i + sequence_length_ad -1 < len(anomaly_scores_ts): # Boundary check
10 anomaly_scores_ts[i + sequence_length_ad - 1] = error_val
Step 1.6: Identify Anomalies and Visualize¶
1# Filter out NaNs from reconstruction_errors before calculating percentile
2valid_errors = reconstruction_errors[~np.isnan(reconstruction_errors)]
3if len(valid_errors) > 0:
4 threshold_ad = np.percentile(valid_errors, 95)
5 print(f"Anomaly threshold (95th percentile of errors): {threshold_ad:.4f}")
6 anomalous_indices = np.where(reconstruction_errors > threshold_ad)[0]
7 anomalous_time_points = [
8 idx + sequence_length_ad - 1 for idx in anomalous_indices
9 if idx + sequence_length_ad - 1 < len(df_ad) # Boundary check
10 ]
11else:
12 print("No valid reconstruction errors to calculate threshold.")
13 threshold_ad = np.inf # Set to infinity if no errors
14 anomalous_time_points = []
15
16
17plt.figure(figsize=(12, 6))
18plt.subplot(2, 1, 1)
19plt.plot(df_ad['Timestamp'], df_ad['Value'], label='Original Signal')
20if anomalous_time_points:
21 plt.scatter(df_ad['Timestamp'].iloc[anomalous_time_points],
22 df_ad['Value'].iloc[anomalous_time_points],
23 color='red', label='Detected Anomalies', marker='o', s=50, zorder=5)
24plt.title('Signal with Detected Anomalies (LSTM Autoencoder)')
25plt.ylabel('Value'); plt.legend(); plt.grid(True)
26
27plt.subplot(2, 1, 2)
28plt.plot(df_ad['Timestamp'], anomaly_scores_ts,
29 label='Reconstruction Error (Anomaly Score)', color='orange')
30if np.isfinite(threshold_ad): # Only plot threshold if it's finite
31 plt.axhline(threshold_ad, color='red', linestyle='--',
32 label=f'Anomaly Threshold ({threshold_ad:.2f})')
33plt.title('Anomaly Scores Over Time')
34plt.xlabel('Time'); plt.ylabel('Reconstruction Error (MSE)')
35plt.legend(); plt.grid(True)
36plt.tight_layout()
37# plt.savefig(os.path.join(output_dir_ad, "ad_lstm_ae_results.png"))
38plt.show()
Example Output Plot (LSTM Autoencoder):
Top: Original signal with detected anomalies. Bottom: Reconstruction error over time with the anomaly threshold.¶
Example 2: Using SequenceAnomalyScoreLayer (Conceptual)¶
The SequenceAnomalyScoreLayer
is designed to be integrated into a larger model. It takes learned
features (e.g., from an encoder or attention layers) as input and
outputs a scalar anomaly score. Training this layer requires a custom
setup with an appropriate loss function, not shown in this isolated
example.
Step 2.1: Imports and Setup¶
1import tensorflow as tf
2from fusionlab.nn.anomaly_detection import SequenceAnomalyScoreLayer
3print("\nLibraries imported for SequenceAnomalyScoreLayer example.")
Step 2.2: Instantiate and Use the Layer¶
1# Assume 'learned_features' is the output of a preceding layer
2# (e.g., aggregated output of XTFT's attention/LSTM blocks)
3# Shape: (Batch, FeatureDim)
4batch_size_sas = 16
5feature_dim_sas = 64
6learned_features_sas = tf.random.normal(
7 (batch_size_sas, feature_dim_sas), dtype=tf.float32
8 )
9
10# Instantiate the scoring layer
11anomaly_scorer_layer = SequenceAnomalyScoreLayer(
12 hidden_units=32, # Hidden units in the scorer's internal MLP
13 activation='relu',
14 dropout_rate=0.1,
15 final_activation='linear' # Output an unbounded score
16)
17
18# Pass features through the layer to get scores
19# (This is typically done within a main model's call method)
20anomaly_scores_output = anomaly_scorer_layer(
21 learned_features_sas, training=False
22 )
23
24print(f"\nInput features shape: {learned_features_sas.shape}")
25print(f"Output anomaly scores shape: {anomaly_scores_output.shape}")
26# Expected output: (Batch, 1) -> (16, 1)
Note
The SequenceAnomalyScoreLayer needs to be trained as part of a larger model. The loss function would guide what these scores represent (e.g., using anomaly labels if available, or incorporating it into an unsupervised objective). This example only shows the forward pass. Refer to the XTFT ‘feature_based’ strategy discussion in the User Guide for conceptual integration.
Example 3: Using PredictionErrorAnomalyScore¶
The PredictionErrorAnomalyScore
layer calculates an anomaly score based directly on the difference
(error) between true values and a model’s predicted values for a sequence.
Step 3.1: Imports and Setup¶
1import tensorflow as tf
2import numpy as np # For a more visible error injection
3from fusionlab.nn.anomaly_detection import PredictionErrorAnomalyScore
4print("\nLibraries imported for PredictionErrorAnomalyScore example.")
Step 3.2: Instantiate and Use the Layer¶
1# Config
2batch_size_peas = 4
3time_steps_peas = 10
4features_peas = 1 # Univariate example
5
6# Dummy true values (e.g., from a dataset)
7y_true_peas = tf.random.normal(
8 (batch_size_peas, time_steps_peas, features_peas), dtype=tf.float32
9 )
10# Dummy predicted values (e.g., from a forecasting model)
11# Add some noise to simulate prediction errors
12y_pred_peas_np = y_true_peas.numpy() + np.random.normal(
13 scale=0.6, size=y_true_peas.shape
14 ).astype(np.float32)
15# Inject a larger error for one sample to see difference in 'max' aggregation
16y_pred_peas_np[1, 5, 0] += 5.0 # Add large error to sample 1, step 5
17y_pred_peas = tf.constant(y_pred_peas_np)
18
19
20# --- Instantiate with MAE and Mean Aggregation ---
21error_scorer_mean = PredictionErrorAnomalyScore(
22 error_metric='mae', # Use Mean Absolute Error
23 aggregation='mean' # Average errors across time steps
24)
25# Calculate scores (average error per sequence)
26anomaly_scores_mean = error_scorer_mean([y_true_peas, y_pred_peas])
27
28# --- Instantiate with MAE and Max Aggregation ---
29error_scorer_max = PredictionErrorAnomalyScore(
30 error_metric='mae', # Use Mean Absolute Error
31 aggregation='max' # Take max error across time steps
32)
33# Calculate scores (max error per sequence)
34anomaly_scores_max = error_scorer_max([y_true_peas, y_pred_peas])
35
36print(f"\nInput y_true shape: {y_true_peas.shape}")
37print(f"Input y_pred shape: {y_pred_peas.shape}")
38print("\n--- MAE + Mean Aggregation ---")
39print(f"Output anomaly scores shape: {anomaly_scores_mean.shape}")
40print(f"Example Scores (Mean Error per sequence): \n"
41 f"{anomaly_scores_mean.numpy().flatten()}")
42print("\n--- MAE + Max Aggregation ---")
43print(f"Output anomaly scores shape: {anomaly_scores_max.shape}")
44print(f"Example Scores (Max Error per sequence): \n"
45 f"{anomaly_scores_max.numpy().flatten()}")
46# Expected output shapes for scores: (Batch, 1) -> (4, 1)
Note
The scores from PredictionErrorAnomalyScore can be used to
construct a loss term that penalizes large prediction deviations,
aligning with the ‘prediction_based’ anomaly detection strategy
in XTFT.