Data Preparation Workflow

Preparing time series data correctly is crucial for training effective forecasting models like TFT and XTFT. This example demonstrates a typical workflow using utilities from fusionlab to transform raw time series data into the structured sequence format required by these models.

We will cover the following steps:

  1. Imports and Configuration Setup.

  2. Loading raw data.

  3. Basic cleaning and datetime validation.

  4. Generating time series features (lags, rolling stats, etc.).

  5. (Optional) Feature Selection.

  6. Defining feature sets and scaling numerical features.

  7. Reshaping the data into sequences using reshape_xtft_data.

  8. Splitting the sequences into training, validation, and test sets.

  9. (Optional) Saving the processed data.

Step 1: Imports and Configuration

First, we import the necessary libraries: pandas for data handling, numpy for numerical operations, StandardScaler from scikit-learn for feature scaling, joblib for saving artifacts, and the relevant utilities from fusionlab. We also set up an output directory and suppress warnings.

 1import numpy as np
 2import pandas as pd
 3from sklearn.preprocessing import StandardScaler
 4# from sklearn.model_selection import train_test_split # Not used directly here
 5import joblib # For saving scalers
 6import os
 7
 8# from fusionlab.core.io import read_data # Example loader (or use pd.read_csv)
 9# from fusionlab.utils.io import save_job # For robust saver
10from fusionlab.nn.utils import reshape_xtft_data
11from fusionlab.utils.ts_utils import (
12    to_dt,
13    ts_engineering,
14    # select_and_reduce_features # Import if using this optional step
15)
16
17# Suppress warnings for cleaner output
18import warnings
19warnings.filterwarnings('ignore')
20
21# --- Configuration ---
22output_dir = "./data_prep_output" # Directory to save artifacts
23os.makedirs(output_dir, exist_ok=True)
24print(f"Artifacts will be saved to: {output_dir}")

Step 2: Load Raw Data

Load your initial dataset, typically from a CSV file or database. For this example, we generate synthetic multi-item data similar to previous examples. Replace this block with your actual data loading code.

 1# Replace with: df_raw = pd.read_csv("path/to/your/raw_data.csv")
 2n_items = 3
 3n_timesteps = 48 # 4 years of monthly data
 4date_rng = pd.date_range(start='2018-01-01', periods=n_timesteps, freq='MS')
 5df_list = []
 6for item_id in range(n_items):
 7    time = np.arange(n_timesteps)
 8    sales = (
 9        100 + item_id * 50 + time * (2 + item_id) +
10        30 * np.sin(2 * np.pi * time / 12) +
11        np.random.normal(0, 15, n_timesteps)
12    )
13    temp = 15 + 10 * np.sin(2 * np.pi * (time % 12) / 12 + np.pi) + np.random.normal(0, 2)
14    promo = np.random.randint(0, 2, n_timesteps)
15
16    item_df = pd.DataFrame({
17        'Date': date_rng, 'ItemID': item_id,
18        'Temperature': temp, 'PlannedPromotion': promo, 'Sales': sales
19    })
20    df_list.append(item_df)
21df_raw = pd.concat(df_list).reset_index(drop=True)
22print(f"Loaded raw data shape: {df_raw.shape}")
23print("Raw data sample:")
24print(df_raw.head(3))

Step 3: Initial Cleaning & Validation

Ensure the time column is in the correct datetime format using to_dt(). Handle any initial missing values using an appropriate strategy (here, forward fill ffill).

1dt_col = 'Date'
2# Ensure datetime column is correct format
3df_clean = to_dt(df_raw, dt_col=dt_col, error='raise')
4
5# Handle missing values (example: forward fill)
6df_clean = df_clean.ffill()
7print("\nPerformed initial cleaning (datetime check, ffill).")
8print("Cleaned data sample:")
9print(df_clean.head(3))

Step 4: Feature Engineering

Generate additional time series features using ts_engineering(). This can create lag features, rolling window statistics, and time-based features (like year, month, day of week). Rows with NaNs generated by these operations are typically dropped.

 1target_col = 'Sales'
 2df_featured = ts_engineering(
 3    df=df_clean.copy(),   # Pass a copy to avoid modifying df_clean
 4    value_col=target_col, # Generate features based on Sales
 5    dt_col=dt_col,        # Use Date for time features
 6    lags=3,               # Create Sales_lag_1, _lag_2, _lag_3
 7    window=6,             # Create rolling mean/std over 6 months
 8    diff_order=0,         # No differencing
 9    apply_fourier=False,  # No Fourier features
10    scaler=None            # Apply scaling later
11)
12# Drop rows with NaNs introduced by lags/rolling features
13df_featured.dropna(inplace=True)
14print("\nGenerated time series features.")
15print("Shape after feature engineering and dropna:", df_featured.shape)
16print("New columns added:", df_featured.columns.tolist())

Step 5: Feature Selection / Reduction (Optional)

After generating many features, you might want to remove redundant ones (e.g., highly correlated lags) or reduce dimensionality. You could use select_and_reduce_features() here. For this example, we proceed with all generated features.

 1# --- OPTIONAL STEP ---
 2# exclude_cols = [dt_col, 'ItemID'] # Keep identifiers
 3# df_selected, _ = select_and_reduce_features(
 4#     df=df_featured,
 5#     target_col=target_col, exclude_cols=exclude_cols,
 6#     method='corr', corr_threshold=0.95, verbose=0
 7# )
 8# print("\nApplied optional feature selection (if uncommented).")
 9# --- END OPTIONAL STEP ---
10
11# Use all features generated in Step 4 for this workflow
12df_selected = df_featured
13print("\nSkipped optional feature selection step.")

Step 6: Define Feature Sets & Scale Numerics

Define the final lists of static, dynamic, and future columns based on the features now present in the DataFrame. Then, apply scaling (e.g., StandardScaler) to the numerical features that will be fed into the neural network. Save the scaler for later use during prediction to inverse-transform the output.

 1# Define final feature sets AFTER engineering/selection
 2static_cols = ['ItemID'] # Only ItemID remains truly static here
 3# Dynamic includes time features, lags, rolling stats, temp
 4dynamic_cols = [
 5    'Temperature', 'Sales_lag_1', 'Sales_lag_2', 'Sales_lag_3',
 6    'rolling_mean_6', 'rolling_std_6', 'year', 'month', 'day',
 7    'day_of_week', 'is_weekend', 'quarter'
 8    ]
 9# Future includes known promotions and time features known ahead
10future_cols = ['PlannedPromotion', 'year', 'month', 'day',
11               'day_of_week', 'is_weekend', 'quarter']
12# Columns to be scaled (numerical inputs and the target)
13numerical_cols = ['Temperature', target_col, # Include target
14                  'Sales_lag_1', 'Sales_lag_2', 'Sales_lag_3',
15                  'rolling_mean_6', 'rolling_std_6']
16
17# Apply scaling
18scaler = StandardScaler()
19df_scaled = df_selected.copy()
20# Scale target AND relevant numerical input features
21df_scaled[numerical_cols] = scaler.fit_transform(df_scaled[numerical_cols])
22
23# Save the scaler (important!)
24scaler_path = os.path.join(output_dir, "feature_scaler.joblib")
25joblib.dump(scaler, scaler_path)
26print(f"\nScaled numerical features. Scaler saved to {scaler_path}")
27
28# Note: Categorical features ('ItemID', 'Month', 'PlannedPromotion',
29# time features like 'day_of_week', 'is_weekend', 'quarter') are
30# assumed handled by model embeddings. If not, encode them here.

Step 7: Reshape into Sequences using reshape_xtft_data

Use the reshape_xtft_data() utility to transform the processed, scaled DataFrame into the structured NumPy arrays (static, dynamic, future, target) expected by TFT/XTFT models. This handles the rolling window creation and feature separation.

 1time_steps = 12         # 1 year lookback
 2forecast_horizons = 6   # Predict 6 months ahead
 3spatial_cols = ['ItemID'] # Group sequences by ItemID
 4
 5print(f"\nReshaping data into sequences (T={time_steps}, H={forecast_horizons})...")
 6static_data, dynamic_data, future_data, target_data = reshape_xtft_data(
 7    df=df_scaled, # Use scaled data
 8    dt_col=dt_col,
 9    target_col=target_col, # Target column name
10    dynamic_cols=dynamic_cols, # List of dynamic column names
11    static_cols=static_cols,   # List of static column names
12    future_cols=future_cols,   # List of future column names
13    spatial_cols=spatial_cols, # List of grouping columns
14    time_steps=time_steps,
15    forecast_horizons=forecast_horizons,
16    verbose=1 # Show resulting shapes
17)

Step 8: Train / Validation / Test Split

Split the generated sequence arrays into training, validation, and (optionally) test sets. A chronological split is typically required for time series data.

 1# Example: 70% Train, 15% Validation, 15% Test (Chronological)
 2# Ensure data was sorted by time before reshaping for this split type
 3if static_data is not None: # Check if static data exists
 4    n_samples = static_data.shape[0]
 5elif dynamic_data is not None:
 6     n_samples = dynamic_data.shape[0]
 7else:
 8     raise ValueError("No data available to split.")
 9
10n_val = int(n_samples * 0.15)
11n_test = int(n_samples * 0.15)
12n_train = n_samples - n_val - n_test
13
14# Perform the split for each array type
15X_train_static, X_val_static, X_test_static = (
16    static_data[:n_train],
17    static_data[n_train:n_train + n_val],
18    static_data[n_train + n_val:]
19)
20X_train_dynamic, X_val_dynamic, X_test_dynamic = (
21    dynamic_data[:n_train],
22    dynamic_data[n_train:n_train + n_val],
23    dynamic_data[n_train + n_val:]
24)
25X_train_future, X_val_future, X_test_future = (
26    future_data[:n_train],
27    future_data[n_train:n_train + n_val],
28    future_data[n_train + n_val:]
29)
30y_train, y_val, y_test = (
31    target_data[:n_train],
32    target_data[n_train:n_train + n_val],
33    target_data[n_train + n_val:]
34)
35
36print("\nData split into Train/Validation/Test sets:")
37print(f"  Train Samples : {n_train}")
38print(f"  Val.  Samples : {n_val}")
39print(f"  Test  Samples : {n_test}")
40print(f"  Example Train Dynamic Shape: {X_train_dynamic.shape}")

Step 9: Save Processed Data (Optional)

Optionally, save the final processed sequence arrays using np.savez for easy reloading during model training experiments, avoiding the need to repeat the entire preprocessing pipeline each time.

 1processed_data_path = os.path.join(output_dir, "processed_sequences.npz")
 2np.savez(
 3    processed_data_path,
 4    # Saving only train/val for brevity, add test sets if needed
 5    X_train_static=X_train_static, X_val_static=X_val_static,
 6    X_train_dynamic=X_train_dynamic, X_val_dynamic=X_val_dynamic,
 7    X_train_future=X_train_future, X_val_future=X_val_future,
 8    y_train=y_train, y_val=y_val
 9    # Add X_test_*, y_test if split earlier
10)
11print(f"\nProcessed sequence data saved to {processed_data_path}")