"""
sensor_simulation.py
Module for simulating synthetic brain activity data for sensor-level measurements.
"""
import os
from pathlib import Path
import logging
from pdb import main
from typing import Optional, Tuple, Union, Dict, List, Any
import numpy as np
from numpy.random import Generator
from scipy.stats import wishart
from scipy.signal import butter, filtfilt
import mne
from mne.io.constants import FIFF
import matplotlib.pyplot as plt
import matplotlib.cm as cm # Import colormap functionality
from matplotlib.lines import Line2D # Import for custom legend
import matplotlib.gridspec as gridspec
import matplotlib.gridspec as gridspec
from mpl_toolkits.axes_grid1 import make_axes_locatable # For better colorbar placement
from calibrain import LeadfieldBuilder
from calibrain.utils import load_config
[docs]
class SensorSimulator:
"""Simulates synthetic brain activity data for sensor-level measurements.
"""
[docs]
def __init__(
self,
logger: Optional[logging.Logger] = None,
):
"""
Initialize the SensorSimulator.
Parameters
----------
logger : Optional[logging.Logger]
Logger instance for logging messages. If None, a default logger will be created.
"""
self.logger = logger if logger else logging.getLogger(__name__)
# Prameters that will be set during simulation
self.sensor_units = FIFF.FIFF_UNIT_T # Default unit for MEG (mag) sensors (T)
def _project_sources_to_sensors(self, x: np.ndarray, L: np.ndarray, orientation_type: str) -> np.ndarray:
"""
Project the source activity to the sensor space using the leadfield matrix.
Parameters
----------
x : np.ndarray (nAm)
Source activity.
- 'fixed': Shape (n_sources, n_times).
- 'free': Shape (n_sources, 3, n_times).
L : np.ndarray
Leadfield matrix (µV / nAm for EEG or ft / nAm for MEG).
- 'fixed': Shape (n_sensors, n_sources).
- 'free': Shape (n_sensors, n_sources, 3).
orientation_type : str
Orientation type of the sources ('fixed' or 'free').
Returns
-------
np.ndarray
Sensor measurements (y_clean). Shape: (n_sensors, n_times). The units depends on the channel type:
- 'eeg': µV
- 'meg': fT
"""
if orientation_type == "fixed":
# Matrix multiplication: (n_sensors, n_sources) @ (n_sources, n_times) -> (n_sensors, n_times)
y = L @ x
elif orientation_type == "free":
# Einstein summation: Sum over source index 'm' and orientation index 'r'
# (n_sensors, n_sources, 3) einsum (n_sources, 3, n_times) -> (n_sensors, n_times)
y = np.einsum("nmr,mrt->nt", L, x) # Corrected einsum indices
else:
raise ValueError(f"Unsupported orientation type: {orientation_type}")
return y
def _add_noise(self, y_clean, alpha_SNR, noise_seed):
"""
Adds Homoscedastic (uniform variance across channels) and uncorrelated (white) Gaussian noise to a clean signal based on a desired SNR level.
Parameters:
y_clean : np.ndarray
The clean signal array (e.g., channels x times).
alpha_SNR : float
Desired signal-to-noise ratio between 0 and 1.
- 0.0 means full noise, no signal.
- 1.0 means no noise, only signal.
noise_seed : int
Seed for the random number generator to ensure reproducibility.
Returns:
- tuple[np.ndarray, np.ndarray, float]
- y_noisy : np.ndarray
The noisy signal with added Gaussian noise.
- noise : np.ndarray
The noise added to the clean signal.
- noise_var : float
The variance of the noise added to the clean signal.
"""
if not (0.0 <= alpha_SNR <= 1.0):
raise ValueError("alpha_SNR must be in [0, 1].")
noise_rng = np.random.RandomState(noise_seed)
# Generate base noise with variance = noise_std^2
noise_std = 0.1 # 0.01 # Standard deviation of the noise
if alpha_SNR == 1.0:
# No noise added
eps = np.zeros_like(y_clean)
return y_clean.copy(), eps, 0.0
# Sample noise from a Gaussian distribution
eps = noise_rng.normal(loc=0.0, scale=noise_std, size=y_clean.shape)
# Compute frobenius norms
signal_norm = np.linalg.norm(y_clean, ord='fro')
eps_norm = np.linalg.norm(eps, ord='fro')
if alpha_SNR == 0.0:
# Avoid division by zero, treat as full noise
# Pure noise with same norm as clean signal
eta = signal_norm / eps_norm
eps_scaled = eta * eps
return eps_scaled.copy(), eps_scaled, (eta * noise_std) ** 2
# General case: scaled noise added to clean signal
# Scale the noise to achieve the desired SNR
eta = ((1 - alpha_SNR) / alpha_SNR) * (signal_norm / eps_norm)
eps_scaled = eta * eps # Scaled noise
y_noisy = y_clean + eps_scaled # Add noise to signal
noise_var = (eta * noise_std) ** 2
# OLD APPROACH:
# signal_power = np.mean(y_clean ** 2)
# if signal_power == 0:
# print("Warning: Clean signal power is zero. Cannot add noise based on SNR.")
# noise_power = 0
# noise = np.zeros_like(y_clean)
# else:
# snr_linear = 10 ** (self.alpha_SNR / 10.0)
# # Homoscedastic case: The standard deviation and thus the variance (noise_power) is the same for all sensors and all time points.
# noise_power = signal_power / snr_linear # Variance of the noise
# noise_std = np.sqrt(noise_power)
# # Draw noise from Gaussian distribution (independenet noise at each sensor and at each time point). -> The noise covariance matrix is diagonal.
# noise = noise_rng.normal(0, noise_std, size=y_clean.shape) # White noise (uncorrelated across sensors and time) with a uniform power across sensors. shape: n_sensors x n_times.
# self.logger.info(f"Noise: {noise.shape}, noise power: {noise_power:.4f}, noise std: {noise_std:.4f}")
# y_noisy = y_clean + noise
# return y_noisy, noise, noise_power
return y_noisy, eps_scaled, noise_var
[docs]
def simulate(
self,
x_trials: List[np.ndarray],
L: np.ndarray,
orientation_type: str = "fixed",
alpha_SNR: float = 0.5,
n_trials: int = 1,
global_seed: int = 42,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Simulate sensor trials by projecting source data to sensor space and adding noise to the clean sensor data.
Parameters
----------
x_trials : List[np.ndarray]
List of source time courses for each trial. Each element should be an array of shape (n_sources, n_times) for fixed orientation or (n_sources, 3, n_times) for free orientation.
L : np.ndarray
Leadfield matrix (µV / nAm for EEG or fT / nAm for MEG).
- 'fixed': Shape (n_sensors, n_sources).
- 'free': Shape (n_sensors, 3, n_sources).
orientation_type : str
Orientation type of the sources ('fixed' or 'free').
alpha_SNR : float
Desired signal-to-noise ratio between 0 and 1.
- 0.0 means full noise, no signal.
- 1.0 means no noise, only signal.
n_trials : int
Number of trials to simulate. Default is 1.
global_seed : int
Global seed for random number generation to ensure reproducibility across trials.
Returns
-------
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
- y_clean_all_trials : np.ndarray
Clean sensor data for all trials. Shape: (n_trials, n_sensors, n_times).
- y_noisy_all_trials : np.ndarray
Noisy sensor data for all trials. Shape: (n_trials, n_sensors, n_times).
- noise_all_trials : np.ndarray
Noise added to the clean sensor data for all trials. Shape: (n_trials, n_sensors, n_times).
- noise_var_all_trials : np.ndarray
Noise variance for each trial. Shape: (n_trials,).
"""
noise_rng = np.random.RandomState(global_seed + 123456)
noise_seeds = noise_rng.randint(0, 2**32 - 1, size=n_trials)
y_clean_all_trials, y_noisy_all_trials = [], []
noise_all_trials, noise_var_all_trials = [], []
for i in range(n_trials):
x_trial = x_trials[i] # Source time courses for this trial
noise_seed = noise_seeds[i]
self.logger.debug(f"[Trial {i+1}/{n_trials}] Adding noise with seed {noise_seed}")
# Generate source time courses for this trial
y_clean = self._project_sources_to_sensors(x=x_trial, L=L, orientation_type='fixed')
# Add noise to the clean sensor data for this trial
y_noisy, noise, noise_var = self._add_noise(y_clean, alpha_SNR, noise_seed)
y_clean_all_trials.append(y_clean)
y_noisy_all_trials.append(y_noisy)
noise_all_trials.append(noise)
noise_var_all_trials.append(noise_var)
y_clean_all_trials = np.array(y_clean_all_trials) # (n_trials, n_channels, n_times)
x_trials = np.array(x_trials) # (n_trials, n_sources, [n_orient,] n_times)
y_noisy_all_trials = np.array(y_noisy_all_trials) # (n_trials, n_channels, n_times)
noise_all_trials = np.array(noise_all_trials) # (n_trials, n_channels, n_times)
noise_var_all_trials = np.array(noise_var_all_trials) # (n_trials,)
self.logger.info(f"Noise addition complete.")
self.logger.info(f"Shape of clean sensor data for all trials: {y_clean_all_trials.shape}")
self.logger.info(f"Shape of noisy sensor data for all trials: {y_noisy_all_trials.shape}")
self.logger.info(f"Shape of noise data for all trials: {noise_all_trials.shape}")
self.logger.info(f"Shape of noise variance data for all trials: {noise_var_all_trials.shape}")
# Reshape leadfield matrix for free orientation if needed by downstream estimators.
if orientation_type == "free":
self.logger.info("Reshaping free orientation leadfield from (sensors, sources, 3) to (sensors, sources*3)")
L = L.reshape(L.shape[0], -1)
return y_clean_all_trials, y_noisy_all_trials, noise_all_trials, noise_var_all_trials
[docs]
def main():
logging.basicConfig(
level=logging.INFO, # or DEBUG
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
handlers=[
logging.StreamHandler(), # Console
logging.FileHandler("simulation.log") # Optional: file output
]
)
logger = logging.getLogger("SensorSimulator")
sensor_simulator = SensorSimulator(
logger=logger,
)
# Example source activity (randomly generated for demonstration)
n_trials = 4
n_sources = 10
n_times = 1000
x_trials = np.array([np.random.randn(n_sources, n_times) for _ in range(n_trials)])
# Example leadfield matrix (randomly generated for demonstration)
n_sensors = 5
L = np.random.randn(n_sensors, n_sources)
# Simulate sensor data
y_clean, y_noisy, noise, noise_var = sensor_simulator.simulate(
x_trials,
L,
orientation_type="fixed",
alpha_SNR=0.5,
n_trials=n_trials,
global_seed=42
)
# sensor_simulator.sensor_units = "T"
logger.info(f"Clean sensor data shape: {y_clean.shape}")
logger.info(f"Noisy sensor data shape: {y_noisy.shape}")
logger.info(f"Noise shape: {noise.shape}")
logger.info(f"Noise variance shape: {noise_var.shape}")
if __name__ == "__main__":
main()