Source code for calibrain.leadfield_builder

"""
Module for simulating leadfield matrices using MNE-Python.

Provides a configurable framework for setting up and running a leadfield
simulation pipeline. Handles various MNE-Python components, such as source
space, BEM model, montage, info object, forward solution, and leadfield matrix,
allowing loading from files or creating them based on configuration.
"""

import logging
import numpy as np
from pathlib import Path
from mne.io.constants import FIFF
import mne
from typing import Dict, Optional, Union
from calibrain.utils import load_config
from sklearn.utils import check_random_state

[docs] class LeadfieldBuilder: """ Simulates leadfield matrices based on a configuration dictionary. Handles the creation or loading of necessary MNE-Python components like source space, BEM model, montage, info, and forward solution. Attributes ---------- config : dict The configuration dictionary driving the simulation process. self.logger : logging.Logger Logger instance for logging messages. data_path : Path Path to the data directory. subject : str Subject identifier. subjects_dir : Path Path to the FreeSurfer subjects directory. save_path : Path Base path where generated files (source space, BEM, etc.) will be saved. """
[docs] def __init__(self, config: Dict = None, leadfield_dir: str = None, logger: Optional[logging.Logger] = None): """ Initialize the LeadfieldBuilder. Parameters ---------- config : dict Configuration dictionary containing paths and parameters for each simulation step (data, source_space, bem_model, montage, info, forward_solution, leadfield). channel_type : str, optional Type of channels to simulate ('eeg', 'meg', or 'grad'). If None, all channel types will be simulated. Default is None. leadfield_dir : str, optional Directory where leadfield matrices are stored or will be saved. self.logger : logging.Logger, optional Custom self.logger instance. If None, a default self.logger is created, by default None. Raises ------ FileNotFoundError If `data_path` or `subjects_dir` specified in the config do not exist. """ self.config = config if self.config: data_cfg = config["data"] self.subject = data_cfg["subject"] self.data_path = Path(data_cfg["data_path"]) self.subjects_dir = Path(data_cfg["subjects_dir"]) self.save_path = Path(data_cfg["save_path"]) if not self.data_path.exists(): raise FileNotFoundError(f"Data path does not exist: {self.data_path}") if not self.subjects_dir.exists(): raise FileNotFoundError(f"Subjects directory does not exist: {self.subjects_dir}") if not self.save_path.exists(): self.logger.info(f"Save path does not exist. Creating: {self.save_path}") self.save_path.mkdir(parents=True, exist_ok=True) self.logger = logger if logger else logging.getLogger(__name__) self.leadfield_dir = Path(leadfield_dir) # Parameter set during building the leadfield self.sensor_units = None self.logger.info("LeadfieldBuilder initialized successfully.")
[docs] def handle_source_space(self) -> mne.SourceSpaces: """ Load or create the source space. Reads the source space from the file specified in `config['source_space']['fname']` if it exists. Otherwise, creates a new source space using `mne.setup_source_space` with parameters from the configuration. Saves the created source space if `config['source_space']['save']` is True. Returns ------- mne.SourceSpaces The loaded or created source space object. """ source_space_cfg = self.config["source_space"] fname = source_space_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): self.logger.info(f"Loading source space from file: {load_path}") return mne.read_source_spaces(load_path) else: self.logger.warning(f"Source space file does not exist: {load_path}. Creating from scratch...") else: self.logger.info("No source space file specified. Creating from scratch...") valid_kwargs = ["spacing", "surface", "add_dist", "n_jobs", "verbose"] kwargs = {key: value for key, value in source_space_cfg.items() if key in valid_kwargs} src = mne.setup_source_space(subject=self.subject, subjects_dir=self.subjects_dir, **kwargs) if source_space_cfg.get("save", False): save_fname = self.save_path / f"{self.subject}-src.fif" verbose = source_space_cfg.get("verbose") # Let MNE handle default if None overwrite = source_space_cfg.get("overwrite", False) # Default to False self.logger.info(f"Saving source space to file: {save_fname}") mne.write_source_spaces(save_fname, src, overwrite=overwrite, verbose=verbose) return src
[docs] def handle_bem_model(self) -> mne.bem.ConductorModel: """ Load or create the BEM (Boundary Element Model) solution. Reads the BEM solution from the file specified in `config['bem_model']['fname']` if it exists. Otherwise, creates a new BEM model using `mne.make_bem_model` and computes the solution using `mne.make_bem_solution` with parameters from the configuration. Saves the created BEM solution if `config['bem_model']['save']` is True. Returns ------- mne.bem.ConductorModel The loaded or created BEM solution object. """ bem_model_cfg = self.config["bem_model"] fname = bem_model_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): self.logger.info(f"Loading BEM model from file: {load_path}") return mne.read_bem_solution(load_path) else: self.logger.warning(f"BEM model file does not exist: {load_path}. Creating from scratch...") else: self.logger.info("No BEM model file specified. Creating from scratch...") valid_model_kwargs = ["ico", "conductivity", "verbose"] model_kwargs = {key: value for key, value in bem_model_cfg.items() if key in valid_model_kwargs} bem_model = mne.make_bem_model(subject=self.subject, subjects_dir=self.subjects_dir, **model_kwargs) valid_sol_kwargs = ["solver", "verbose"] sol_kwargs = {key: value for key, value in bem_model_cfg.items() if key in valid_sol_kwargs} bem = mne.make_bem_solution(surfs=bem_model, **sol_kwargs) if bem_model_cfg.get("save", False): save_fname = self.save_path / f"{self.subject}-bem.fif" verbose = bem_model_cfg.get("verbose") overwrite = bem_model_cfg.get("overwrite", False) self.logger.info(f"Saving BEM model to file: {save_fname}") mne.write_bem_solution(save_fname, bem, overwrite=overwrite, verbose=verbose) return bem
[docs] def handle_montage(self) -> mne.channels.DigMontage: """ Load or create the sensor montage. Reads the montage from the file specified in `config['montage']['fname']` if it exists. Otherwise, creates a standard montage using `mne.channels.make_standard_montage` with parameters from the configuration. Saves the created montage if `config['montage']['save']` is True. Returns ------- mne.channels.DigMontage The loaded or created montage object. """ montage_cfg = self.config.get("montage", {}) fname = montage_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): self.logger.info(f"Loading montage from file: {load_path}") # Assuming FIF format based on save method return mne.channels.read_dig_fif(load_path) else: self.logger.warning(f"Montage file does not exist: {load_path}. Creating from scratch...") else: self.logger.info("No montage file specified. Creating from scratch...") valid_kwargs = ["kind", "head_size"] # Parameters for make_standard_montage kwargs = {key: value for key, value in montage_cfg.items() if key in valid_kwargs} if not kwargs.get("kind"): self.logger.warning("Montage 'kind' not specified in config, cannot create standard montage. Returning None.") return None # Cannot create without kind montage = mne.channels.make_standard_montage(**kwargs) if montage_cfg.get("save", False): save_fname = self.save_path / f"{self.subject}-montage.fif" verbose = montage_cfg.get("verbose") overwrite = montage_cfg.get("overwrite", False) self.logger.info(f"Saving montage to file: {save_fname}") montage.save(save_fname, overwrite=overwrite, verbose=verbose) return montage
[docs] def handle_info(self) -> mne.Info: """ Load or create the MNE measurement info object. Reads the info from the file specified in `config['info']['fname']` if it exists. Otherwise, creates a new info object using `mne.create_info` with parameters from the configuration, associating it with the montage obtained via `handle_montage`. Saves the created info object if `config['info']['save']` is True. Returns ------- mne.Info The loaded or created info object. Raises ------ ValueError If info cannot be created because the montage is missing or invalid. """ info_cfg = self.config["info"] fname = info_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): self.logger.info(f"Loading info object from file: {load_path}") return mne.io.read_info(load_path) else: self.logger.warning(f"Info file does not exist: {load_path}. Creating from scratch...") else: self.logger.info("No info file specified. Creating from scratch...") montage = self.handle_montage() if montage is None or not hasattr(montage, 'ch_names'): raise ValueError("Cannot create info object without a valid montage.") valid_kwargs = ["sfreq", "ch_types", "verbose"] kwargs = {key: value for key, value in info_cfg.items() if key in valid_kwargs} if not kwargs.get("sfreq") or not kwargs.get("ch_types"): raise ValueError("Cannot create info object without 'sfreq' and 'ch_types' in config.") info = mne.create_info( ch_names=montage.ch_names, **kwargs ) info.set_montage(montage) if info_cfg.get("save", False): save_fname = self.save_path / f"{self.subject}-info.fif" # write_info doesn't have overwrite, it overwrites by default self.logger.info(f"Saving info object to file: {save_fname}") mne.io.write_info(save_fname, info) return info
[docs] def handle_forward_solution(self, info: mne.Info, src: mne.SourceSpaces, bem: mne.bem.ConductorModel) -> mne.Forward: """ Load or create the forward solution. Reads the forward solution from the file specified in `config['forward_solution']['fname']` if it exists and has a valid suffix. Otherwise, creates a new forward solution using `mne.make_forward_solution` with parameters from the configuration. Converts the solution to fixed orientation if `config['forward_solution']['orientation_type']` is 'fixed'. Saves the created forward solution if `config['forward_solution']['save']` is True. Parameters ---------- info : mne.Info The measurement info object. src : mne.SourceSpaces The source space object. bem : mne.bem.ConductorModel The BEM solution object. Returns ------- mne.Forward The loaded or created forward solution object. """ forward_solution_cfg = self.config.get("forward_solution", {}) orientation_type = forward_solution_cfg.get("orientation_type", "free") # Default to free fname = forward_solution_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): # Basic check for expected suffix if not (load_path.name.endswith("fixed-fwd.fif") or load_path.name.endswith("free-fwd.fif")): self.logger.warning(f"File {load_path} does not have a standard forward solution suffix. Attempting to load anyway.") try: self.logger.info(f"Loading forward solution from file: {load_path}") fwd = mne.read_forward_solution(load_path) # Optional: Check if loaded orientation matches config loaded_ori = "fixed" if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI else "free" if loaded_ori != orientation_type: self.logger.warning(f"Loaded forward solution orientation ('{loaded_ori}') does not match config ('{orientation_type}'). Using loaded orientation.") orientation_type = loaded_ori # Update based on loaded file return fwd except Exception as e: self.logger.warning(f"Failed to load forward solution from file: {e}. Proceeding to create it.") else: self.logger.warning(f"Forward solution file does not exist: {load_path}. Proceeding to create it.") else: self.logger.info("No forward solution file specified. Proceeding to create it.") self.logger.info("Creating forward solution...") valid_kwargs = ["trans", "eeg", "meg", "mindist", "ignore_ref", "n_jobs", "verbose"] kwargs = {key: value for key, value in forward_solution_cfg.items() if key in valid_kwargs} # Ensure trans is provided if needed if 'trans' not in kwargs or not kwargs['trans']: self.logger.warning("Transformation file ('trans') not specified in forward_solution config. Using default 'fsaverage-trans.fif'. This might be incorrect.") kwargs['trans'] = 'fsaverage-trans.fif' # Or handle error differently fwd = mne.make_forward_solution( info=info, src=src, bem=bem, **kwargs ) self.logger.info("Forward solution created successfully.") if orientation_type == "fixed": surf_ori = forward_solution_cfg.get("surf_ori", True) force_fixed = forward_solution_cfg.get("force_fixed", True) self.logger.info(f"Converting forward solution to fixed orientation (surf_ori={surf_ori}, force_fixed={force_fixed})...") fwd = mne.convert_forward_solution(fwd, force_fixed=force_fixed, surf_ori=surf_ori) self.logger.info("Orientation fixed successfully.") elif fwd["source_ori"] != FIFF.FIFFV_MNE_FREE_ORI: # If config asked for free but make_forward_solution didn't yield it (unlikely) self.logger.warning("Expected free orientation but forward solution is not. Check configuration.") if forward_solution_cfg.get("save", False): # Determine final orientation type for saving filename final_orientation = "fixed" if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI else "free" save_fname = self.save_path / f"{self.subject}-{final_orientation}-fwd.fif" verbose = forward_solution_cfg.get("verbose") overwrite = forward_solution_cfg.get("overwrite", False) self.logger.info(f"Saving forward solution to file: {save_fname}") mne.write_forward_solution(save_fname, fwd, overwrite=overwrite, verbose=verbose) return fwd
[docs] def handle_leadfield(self, fwd: Optional[mne.Forward] = None) -> np.ndarray: """ Load or extract the leadfield matrix. Reads the leadfield matrix from the .npz file specified in `config['leadfield']['fname']` if it exists. Otherwise, extracts the leadfield data from the provided forward solution `fwd`. Reshapes the matrix for free orientation if necessary. Saves the extracted leadfield matrix if `config['leadfield']['save']` is True. Parameters ---------- fwd : mne.Forward, optional The forward solution object. Required if the leadfield matrix is not loaded from a file, by default None. Returns ------- np.ndarray The leadfield matrix. Shape is (n_sensors, n_sources) for fixed orientation or (n_sensors, n_sources, 3) for free orientation. Raises ------ ValueError If the leadfield cannot be loaded from a file and `fwd` is not provided, or if a loaded leadfield file is invalid. """ leadfield_cfg = self.config.get("leadfield", {}) fname = leadfield_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): try: self.logger.info(f"Loading leadfield matrix from file: {load_path}") with np.load(load_path) as data: if "leadfield" not in data: raise ValueError(f"Invalid .npz file: 'leadfield' key not found in {load_path}") leadfield = data["leadfield"] # Basic validation based on expected dimensions if leadfield.ndim == 2: self.logger.info(f"Loaded fixed orientation leadfield: {leadfield.shape}") elif leadfield.ndim == 3 and leadfield.shape[-1] == 3: self.logger.info(f"Loaded free orientation leadfield: {leadfield.shape}") else: raise ValueError(f"Loaded leadfield has unexpected shape: {leadfield.shape}") return leadfield except Exception as e: self.logger.warning(f"Failed to load leadfield matrix from file: {e}. Proceeding to compute it.") else: self.logger.warning(f"Leadfield file does not exist: {load_path}. Proceeding to compute it.") else: self.logger.info("No leadfield file specified. Proceeding to compute it.") if fwd is None: raise ValueError("Forward solution (fwd) must be provided to compute leadfield matrix when not loading from file.") self.logger.info("Extracting leadfield matrix from the forward solution...") leadfield = fwd["sol"]["data"] orientation_type = None if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI: self.logger.info(f"Extracted fixed orientation leadfield: {leadfield.shape}") orientation_type = "fixed" # Shape is already (n_sensors, n_sources) elif fwd["source_ori"] == FIFF.FIFFV_MNE_FREE_ORI: self.logger.info("Extracted free orientation leadfield (raw shape: %s)", leadfield.shape) n_sensors, n_sources_x_orient = leadfield.shape n_orient = 3 if n_sources_x_orient % n_orient != 0: raise ValueError(f"Cannot reshape free orientation leadfield. Shape {leadfield.shape} not divisible by 3.") n_sources = n_sources_x_orient // n_orient leadfield = leadfield.reshape(n_sensors, n_sources, n_orient) self.logger.info(f"Reshaped free orientation leadfield: {leadfield.shape}") orientation_type = "free" else: self.logger.warning("Unknown leadfield orientation type in forward solution.") if leadfield_cfg.get("save", False) and orientation_type: save_fname = self.save_path / f"{self.subject}-leadfield-{orientation_type}.npz" self.logger.info(f"Saving computed leadfield matrix to file: {save_fname}") np.savez(save_fname, leadfield=leadfield) return leadfield
[docs] def simulate(self) -> np.ndarray: """ Run the full simulation pipeline. Executes the sequence of handling/creating source space, BEM model, info object, forward solution, and finally the leadfield matrix, based on the provided configuration. Returns ------- np.ndarray The final leadfield matrix. Shape depends on the orientation type. Raises ------ RuntimeError If any step in the pipeline fails. """ self.logger.info("Starting leadfield simulation pipeline...") try: src = self.handle_source_space() bem = self.handle_bem_model() info = self.handle_info() fwd = self.handle_forward_solution(info, src, bem) leadfield = self.handle_leadfield(fwd) except Exception as e: self.logger.error(f"Pipeline failed: {e}", exc_info=True) # Log traceback raise RuntimeError("Simulation pipeline failed.") from e self.logger.info("Leadfield simulation pipeline completed successfully.") return leadfield
[docs] def get_leadfield(self, subject: str = "fsaverage", orientation_type: str = "fixed", retrieve_mode: str = "load") -> np.ndarray: """ Get or generate the leadfield matrix based on the specified mode. Updates self.n_sensors and self.n_sources based on the obtained leadfield. Parameters ---------- subject : str Returns ------- np.ndarray The leadfield matrix (L). Shape depends on orientation_type: - 'fixed': (n_sensors, n_sources) - 'free': (n_sensors, n_sources, 3) Raises ------ ValueError If retrieve_mode is invalid, required paths are missing, or loaded/simulated leadfield has unexpected dimensions/format. FileNotFoundError If leadfield_dir does not exist when mode='load'. """ if orientation_type == "fixed": n_orient = 1 expected_dimensions = 2 # Fixed orientation leads to 2D leadfield (n_sensors, n_sources) expected_suffix = "-fixed.npz" elif orientation_type == "free": expected_suffix = "-free.npz" n_orient = 3 expected_dimensions = 3 # Free orientation leads to 3D leadfield (n_sensors, n_sources, 3) else: raise ValueError(f"Invalid orientation_type '{orientation_type}'. Choose 'fixed' or 'free'.") if retrieve_mode == "load": # Define the two specific patterns you were trying to match: # Pattern 1: Includes orientation_type in the filename path_option1 = self.leadfield_dir / f"lead_field_{orientation_type}_{subject}.npz" # Pattern 2: Excludes orientation_type from the filename path_option2 = self.leadfield_dir / f"lead_field_{subject}.npz" try: if path_option1.exists(): leadfield_path = path_option1 elif path_option2.exists(): leadfield_path = path_option2 else: self.logger.warning( f"Leadfield file not found for subject '{subject}' with orientation '{orientation_type}' " f"in directory '{self.leadfield_dir}'.\n" f"Checked specific patterns:\n" f" - {path_option1}\n" f" - {path_option2}") raise FileNotFoundError( f"Leadfield file not found for subject '{subject}' in directory '{self.leadfield_dir}'. " ) self.logger.info(f"Loading leadfield matrix from file: {leadfield_path}") with np.load(leadfield_path) as data: if "leadfield" not in data and "lead_field" not in data: raise ValueError(f"File {leadfield_path} does not contain 'leadfield' or 'lead_field' key.") leadfield = data["leadfield"] if "leadfield" in data else data["lead_field"] try: data_unit = data.get("units", None) if data_unit is not None: if data_unit == FIFF.FIFF_UNIT_V: self.sensor_units = FIFF.FIFF_UNIT_V elif data_unit == FIFF.FIFF_UNIT_T: self.sensor_units = FIFF.FIFF_UNIT_T elif data_unit == FIFF.FIFF_UNIT_T_M: self.sensor_units = FIFF.FIFF_UNIT_T_M else: self.sensor_units = "unknown" self.logger.warning(f"Unknown data unit '{data_unit}' in leadfield file. Defaulting to provided unit.") else: self.sensor_units = ["unknown"] self.logger.warning(f"Unknown data unit '{data_unit}' in leadfield file. Defaulting to 'unknown'.") except KeyError: self.logger.warning("No data unit specified in leadfield file. Defaulting to 'unknown'.") self.sensor_units = ["unknown"] if leadfield.ndim != expected_dimensions: raise ValueError( f"Loaded leadfield matrix dimension mismatch for orientation '{orientation_type}': " f"expected {expected_dimensions} dimensions, but got {leadfield.ndim}." ) self.logger.info(f"Leadfield loaded with shape {leadfield.shape}") except (FileNotFoundError, ValueError) as e: self.logger.error(f"Failed to load leadfield matrix: {e}") raise elif retrieve_mode == "simulate": if not config: raise ValueError("Path to the configuration file (config) must be provided when retrieve_mode='simulate'.") self.logger.info(f"Simulating leadfield matrix using LeadfieldBuilder with config: {config}") try: config = load_config(Path(config)) L_simulator = LeadfieldBuilder(config=config, logger=self.logger) leadfield = L_simulator.simulate() self.logger.info(f"Simulated leadfield matrix with shape {leadfield.shape}") if leadfield.ndim != expected_dimensions: raise ValueError( f"Simulated leadfield matrix dimension mismatch for orientation '{orientation_type}': " f"expected {expected_dimensions} dimensions, but got {leadfield.ndim}." ) # TODO: Handle channel units based on L_simulator.channel_type # if L_simulator.channel_type == "eeg": # self.sensor_units = [FIFF.FIFF_UNIT_V] * leadfield.shape[0] # elif L_simulator.channel_type == "meg": # self.sensor_units = [FIFF.FIFF_UNIT_T] * leadfield.shape[0] # elif L_simulator.channel_type == "grad": # self.sensor_units = [FIFF.FIFF_UNIT_T_M] * leadfield.shape[0] # elif not L_simulator.channel_type: # self.sensor_units = [FIFF.FIFF_UNIT_UNKNOWN] * leadfield.shape[0] # self.logger.warning("Channel type not specified. Defaulting to 'unknown'.") # else: # raise ValueError(f"Invalid channel_type '{L_simulator.channel_type}'. Choose 'eeg', 'meg', or 'grad'.") except Exception as e: self.logger.error(f"Failed to simulate leadfield matrix: {e}") raise elif retrieve_mode == "random": self.logger.info(f"Generating a random leadfield matrix (n_sensors={n_sensors}, n_sources={n_sources}).") rng = check_random_state(64) if orientation_type == "fixed": leadfield = rng.standard_normal((n_sensors, n_sources)) else: leadfield = rng.standard_normal((n_sensors, n_sources, 3)) self.logger.info(f"Random leadfield generated with shape {leadfield.shape}") # TODO: Handle channel units based on self.channel_type # if self.channel_type == "eeg": # self.sensor_units = [FIFF.FIFF_UNIT_V] * leadfield.shape[0] # elif self.channel_type == "meg": # self.sensor_units = [FIFF.FIFF_UNIT_T] * leadfield.shape[0] # elif self.channel_type == "grad": # self.sensor_units = [FIFF.FIFF_UNIT_T_M] * leadfield.shape[0] # else: # raise ValueError(f"Invalid channel_type '{self.channel_type}'. Choose 'eeg', 'meg', or 'grad'.") else: raise ValueError(f"Invalid leadfield mode '{self.retrieve_mode}'. Options are 'load', 'simulate', or 'random'.") # Update n_sensors and n_sources based on the actual leadfield dimensions if leadfield.ndim == 2 == expected_dimensions: # Fixed n_sensors, n_sources = leadfield.shape elif leadfield.ndim == 3 == expected_dimensions: # Free n_sensors, n_sources, _ = leadfield.shape # self.leadfield_units = f"{sensor_units} / {source_units}" self.logger.info(f"Leadfield obtained. Updated n_sensors={n_sensors}, n_sources={n_sources}") return leadfield