Source code for calibrain.leadfield_builder

"""
Module for simulating leadfield matrices using MNE-Python.

Provides a configurable framework for setting up and running a leadfield
simulation pipeline. Handles various MNE-Python components, such as source
space, BEM model, montage, info object, forward solution, and leadfield matrix,
allowing loading from files or creating them based on configuration.
"""

import logging
from dataclasses import dataclass
import numpy as np
from pathlib import Path
from mne.io.constants import FIFF
import mne
from typing import Dict, Optional, Union
from calibrain.utils import load_config
from sklearn.utils import check_random_state


@dataclass
class LeadfieldData:
    leadfield: Optional[np.ndarray] = None
    sensor_kind: Optional[int] = None
    sensor_units: Optional[int] = None
    sensor_unitmult: Optional[int] = None
    coil_type: Optional[int] = None
    src_coords: Optional[np.ndarray] = None
    Q_basis: Optional[np.ndarray] = None

[docs] class LeadfieldBuilder: """ Simulates leadfield matrices based on a configuration dictionary. Handles the creation or loading of necessary MNE-Python components like source space, BEM model, montage, info, and forward solution. Attributes ---------- config : dict The configuration dictionary driving the simulation process. self.logger : logging.Logger Logger instance for logging messages. data_path : Path Path to the data directory. subject : str Subject identifier. subjects_dir : Path Path to the FreeSurfer subjects directory. save_path : Path Base path where generated files (source space, BEM, etc.) will be saved. """
[docs] def __init__(self, config: Dict = None, leadfield_dir: str = None, logger: Optional[logging.Logger] = None): """ Initialize the LeadfieldBuilder. Parameters ---------- config : dict Configuration dictionary containing paths and parameters for each simulation step (data, source_space, bem_model, montage, info, forward_solution, leadfield). channel_type : str, optional Type of channels to simulate ('eeg', 'meg', or 'grad'). If None, all channel types will be simulated. Default is None. leadfield_dir : str, optional Directory where leadfield matrices are stored or will be saved. self.logger : logging.Logger, optional Custom self.logger instance. If None, a default self.logger is created, by default None. Raises ------ FileNotFoundError If `data_path` or `subjects_dir` specified in the config do not exist. """ self.config = config if self.config: data_cfg = config["data"] self.subject = data_cfg["subject"] self.data_path = Path(data_cfg["data_path"]) self.subjects_dir = Path(data_cfg["subjects_dir"]) self.save_path = Path(data_cfg["save_path"]) if not self.data_path.exists(): raise FileNotFoundError(f"Data path does not exist: {self.data_path}") if not self.subjects_dir.exists(): raise FileNotFoundError(f"Subjects directory does not exist: {self.subjects_dir}") if not self.save_path.exists(): self.logger.info(f"Save path does not exist. Creating: {self.save_path}") self.save_path.mkdir(parents=True, exist_ok=True) self.logger = logger if logger else logging.getLogger(__name__) self.leadfield_dir = Path(leadfield_dir) # Parameters describing the simulated/loaded sensors self.sensor_units: Optional[int] = None self.sensor_unitmult: Optional[int] = None self.sensor_kind: Optional[int] = None self.coil_type: Optional[int] = None
def __getstate__(self): state = self.__dict__.copy() state['logger'] = None return state def __setstate__(self, state): self.__dict__.update(state) if self.logger is None: self.logger = logging.getLogger(self.__class__.__name__) def _set_sensor_metadata( self, *, kind: Optional[Union[int, np.integer]] = None, unit: Optional[Union[int, np.integer]] = None, unit_mul: Optional[Union[int, np.integer]] = None, coil_type: Optional[Union[int, np.integer]] = None, ) -> None: """Assign sensor metadata, coercing numpy scalars to ints.""" if kind is not None: self.sensor_kind = int(kind) # 1 for FIFFV_MEG_CH, 2 for FIFFV_EEG_CH if unit is not None: self.sensor_units = int(unit) if unit_mul is not None: self.sensor_unitmult = int(unit_mul) if coil_type is not None: self.coil_type = int(coil_type) def _scale_leadfield( self, leadfield: np.ndarray, metadata: Optional[LeadfieldData] = None, ) -> np.ndarray: """Scale leadfield to fT/nAm or µV/nAm for better readability.""" sensor_units = metadata.sensor_units if metadata else self.sensor_units sensor_unitmult = metadata.sensor_unitmult if metadata else self.sensor_unitmult if sensor_units is None: return leadfield target_mul = None scaler = None if sensor_units in (FIFF.FIFF_UNIT_T, FIFF.FIFF_UNIT_T_M): target_mul = FIFF.FIFF_UNITM_F # femto # Convert data from T / (Am) to fT / (nAm): # 1 T = 1e15 fT and 1 Am = 1e9 nAm, so the combined scaling factor is 1e6 scaler = 10 ** (-FIFF.FIFF_UNITM_F + FIFF.FIFF_UNITM_N) elif sensor_units == FIFF.FIFF_UNIT_V: target_mul = FIFF.FIFF_UNITM_MU # micro (10^-6) # Convert from V / (nAm) to µV / (Am): # 1 V = 1e6 µV and 1 nAm = 1e-9 Am -> combined scaler = 1e-3 scaler = 10 ** (-FIFF.FIFF_UNITM_MU + FIFF.FIFF_UNITM_N) if target_mul is None or scaler is None: return leadfield if sensor_unitmult == target_mul: return leadfield if sensor_unitmult not in (None, FIFF.FIFF_UNITM_NONE): return leadfield leadfield = leadfield * scaler if metadata is not None: metadata.sensor_unitmult = target_mul else: self.sensor_unitmult = target_mul return leadfield def _reshape_free_orientation(self, leadfield: np.ndarray) -> np.ndarray: """Convert (n_sensors, 3 * n_sources) -> (n_sensors, n_sources, 3).""" if leadfield.ndim != 2: return leadfield n_sensors, n_components = leadfield.shape if n_components % 3 != 0: raise ValueError( f"Free-orientation leadfield has shape {leadfield.shape}; " "the second dimension must be divisible by 3." ) n_sources = n_components // 3 reshaped = leadfield.reshape(n_sensors, n_sources, 3) self.logger.debug( "Converted free-orientation leadfield from shape %s to %s", leadfield.shape, reshaped.shape, ) return reshaped def _store_sensor_metadata_from_info(self, info: Optional[mne.Info]) -> None: """Capture sensor metadata (kind/unit/unit_mul) from an info object.""" if info is None or "chs" not in info or not info["chs"]: return chs = info["chs"] first = chs[0] kind = first.get("kind") unit = first.get("unit") unit_mul = first.get("unit_mul") coil_type = first.get("coil_type") self._set_sensor_metadata( kind=kind, unit=unit, unit_mul=unit_mul, coil_type=coil_type )
[docs] def handle_source_space(self) -> mne.SourceSpaces: """ Load or create the source space. Reads the source space from the file specified in `config['source_space']['fname']` if it exists. Otherwise, creates a new source space using `mne.setup_source_space` with parameters from the configuration. Saves the created source space if `config['source_space']['save']` is True. Returns ------- mne.SourceSpaces The loaded or created source space object. """ source_space_cfg = self.config["source_space"] fname = source_space_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): self.logger.info(f"Loading source space from file: {load_path}") return mne.read_source_spaces(load_path) else: self.logger.warning(f"Source space file does not exist: {load_path}. Creating from scratch...") else: self.logger.info("No source space file specified. Creating from scratch...") valid_kwargs = ["spacing", "surface", "add_dist", "n_jobs", "verbose"] kwargs = {key: value for key, value in source_space_cfg.items() if key in valid_kwargs} src = mne.setup_source_space(subject=self.subject, subjects_dir=self.subjects_dir, **kwargs) if source_space_cfg.get("save", False): save_fname = self.save_path / f"{self.subject}-src.fif" verbose = source_space_cfg.get("verbose") # Let MNE handle default if None overwrite = source_space_cfg.get("overwrite", False) # Default to False self.logger.info(f"Saving source space to file: {save_fname}") mne.write_source_spaces(save_fname, src, overwrite=overwrite, verbose=verbose) return src
[docs] def handle_bem_model(self) -> mne.bem.ConductorModel: """ Load or create the BEM (Boundary Element Model) solution. Reads the BEM solution from the file specified in `config['bem_model']['fname']` if it exists. Otherwise, creates a new BEM model using `mne.make_bem_model` and computes the solution using `mne.make_bem_solution` with parameters from the configuration. Saves the created BEM solution if `config['bem_model']['save']` is True. Returns ------- mne.bem.ConductorModel The loaded or created BEM solution object. """ bem_model_cfg = self.config["bem_model"] fname = bem_model_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): self.logger.info(f"Loading BEM model from file: {load_path}") return mne.read_bem_solution(load_path) else: self.logger.warning(f"BEM model file does not exist: {load_path}. Creating from scratch...") else: self.logger.info("No BEM model file specified. Creating from scratch...") valid_model_kwargs = ["ico", "conductivity", "verbose"] model_kwargs = {key: value for key, value in bem_model_cfg.items() if key in valid_model_kwargs} bem_model = mne.make_bem_model(subject=self.subject, subjects_dir=self.subjects_dir, **model_kwargs) valid_sol_kwargs = ["solver", "verbose"] sol_kwargs = {key: value for key, value in bem_model_cfg.items() if key in valid_sol_kwargs} bem = mne.make_bem_solution(surfs=bem_model, **sol_kwargs) if bem_model_cfg.get("save", False): save_fname = self.save_path / f"{self.subject}-bem.fif" verbose = bem_model_cfg.get("verbose") overwrite = bem_model_cfg.get("overwrite", False) self.logger.info(f"Saving BEM model to file: {save_fname}") mne.write_bem_solution(save_fname, bem, overwrite=overwrite, verbose=verbose) return bem
[docs] def handle_montage(self) -> mne.channels.DigMontage: """ Load or create the sensor montage. Reads the montage from the file specified in `config['montage']['fname']` if it exists. Otherwise, creates a standard montage using `mne.channels.make_standard_montage` with parameters from the configuration. Saves the created montage if `config['montage']['save']` is True. Returns ------- mne.channels.DigMontage The loaded or created montage object. """ montage_cfg = self.config.get("montage", {}) fname = montage_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): self.logger.info(f"Loading montage from file: {load_path}") # Assuming FIF format based on save method return mne.channels.read_dig_fif(load_path) else: self.logger.warning(f"Montage file does not exist: {load_path}. Creating from scratch...") else: self.logger.info("No montage file specified. Creating from scratch...") valid_kwargs = ["kind", "head_size"] # Parameters for make_standard_montage kwargs = {key: value for key, value in montage_cfg.items() if key in valid_kwargs} if not kwargs.get("kind"): self.logger.warning("Montage 'kind' not specified in config, cannot create standard montage. Returning None.") return None # Cannot create without kind montage = mne.channels.make_standard_montage(**kwargs) if montage_cfg.get("save", False): save_fname = self.save_path / f"{self.subject}-montage.fif" verbose = montage_cfg.get("verbose") overwrite = montage_cfg.get("overwrite", False) self.logger.info(f"Saving montage to file: {save_fname}") montage.save(save_fname, overwrite=overwrite, verbose=verbose) return montage
[docs] def handle_info(self) -> mne.Info: """ Load or create the MNE measurement info object. Reads the info from the file specified in `config['info']['fname']` if it exists. Otherwise, creates a new info object using `mne.create_info` with parameters from the configuration, associating it with the montage obtained via `handle_montage`. Saves the created info object if `config['info']['save']` is True. Returns ------- mne.Info The loaded or created info object. Raises ------ ValueError If info cannot be created because the montage is missing or invalid. """ info_cfg = self.config["info"] fname = info_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): self.logger.info(f"Loading info object from file: {load_path}") info = mne.io.read_info(load_path) self._store_sensor_metadata_from_info(info) return info else: self.logger.warning(f"Info file does not exist: {load_path}. Creating from scratch...") else: self.logger.info("No info file specified. Creating from scratch...") montage = self.handle_montage() if montage is None or not hasattr(montage, 'ch_names'): raise ValueError("Cannot create info object without a valid montage.") valid_kwargs = ["sfreq", "ch_types", "verbose"] kwargs = {key: value for key, value in info_cfg.items() if key in valid_kwargs} if not kwargs.get("sfreq") or not kwargs.get("ch_types"): raise ValueError("Cannot create info object without 'sfreq' and 'ch_types' in config.") info = mne.create_info( ch_names=montage.ch_names, **kwargs ) info.set_montage(montage) self._store_sensor_metadata_from_info(info) if info_cfg.get("save", False): save_fname = self.save_path / f"{self.subject}-info.fif" # write_info doesn't have overwrite, it overwrites by default self.logger.info(f"Saving info object to file: {save_fname}") mne.io.write_info(save_fname, info) return info
[docs] def handle_forward_solution(self, info: mne.Info, src: mne.SourceSpaces, bem: mne.bem.ConductorModel) -> mne.Forward: """ Load or create the forward solution. Reads the forward solution from the file specified in `config['forward_solution']['fname']` if it exists and has a valid suffix. Otherwise, creates a new forward solution using `mne.make_forward_solution` with parameters from the configuration. Converts the solution to fixed orientation if `config['forward_solution']['orientation_type']` is 'fixed'. Saves the created forward solution if `config['forward_solution']['save']` is True. Parameters ---------- info : mne.Info The measurement info object. src : mne.SourceSpaces The source space object. bem : mne.bem.ConductorModel The BEM solution object. Returns ------- mne.Forward The loaded or created forward solution object. """ forward_solution_cfg = self.config.get("forward_solution", {}) orientation_type = forward_solution_cfg.get("orientation_type", "free") # Default to free fname = forward_solution_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): # Basic check for expected suffix if not (load_path.name.endswith("fixed-fwd.fif") or load_path.name.endswith("free-fwd.fif")): self.logger.warning(f"File {load_path} does not have a standard forward solution suffix. Attempting to load anyway.") try: self.logger.info(f"Loading forward solution from file: {load_path}") fwd = mne.read_forward_solution(load_path) self._store_sensor_metadata_from_info(fwd["info"]) # Optional: Check if loaded orientation matches config loaded_ori = "fixed" if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI else "free" if loaded_ori != orientation_type: self.logger.warning(f"Loaded forward solution orientation ('{loaded_ori}') does not match config ('{orientation_type}'). Using loaded orientation.") orientation_type = loaded_ori # Update based on loaded file return fwd except Exception as e: self.logger.warning(f"Failed to load forward solution from file: {e}. Proceeding to create it.") else: self.logger.warning(f"Forward solution file does not exist: {load_path}. Proceeding to create it.") else: self.logger.info("No forward solution file specified. Proceeding to create it.") self.logger.info("Creating forward solution...") valid_kwargs = ["trans", "eeg", "meg", "mindist", "ignore_ref", "n_jobs", "verbose"] kwargs = {key: value for key, value in forward_solution_cfg.items() if key in valid_kwargs} # Ensure trans is provided if needed if 'trans' not in kwargs or not kwargs['trans']: self.logger.warning("Transformation file ('trans') not specified in forward_solution config. Using default 'fsaverage-trans.fif'. This might be incorrect.") kwargs['trans'] = 'fsaverage-trans.fif' # Or handle error differently fwd = mne.make_forward_solution( info=info, src=src, bem=bem, **kwargs ) self.logger.info("Forward solution created successfully.") self._store_sensor_metadata_from_info(fwd["info"]) if orientation_type == "fixed": surf_ori = forward_solution_cfg.get("surf_ori", True) force_fixed = forward_solution_cfg.get("force_fixed", True) self.logger.info(f"Converting forward solution to fixed orientation (surf_ori={surf_ori}, force_fixed={force_fixed})...") fwd = mne.convert_forward_solution(fwd, force_fixed=force_fixed, surf_ori=surf_ori) self.logger.info("Orientation fixed successfully.") elif fwd["source_ori"] != FIFF.FIFFV_MNE_FREE_ORI: # If config asked for free but make_forward_solution didn't yield it (unlikely) self.logger.warning("Expected free orientation but forward solution is not. Check configuration.") if forward_solution_cfg.get("save", False): # Determine final orientation type for saving filename final_orientation = "fixed" if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI else "free" save_fname = self.save_path / f"{self.subject}-{final_orientation}-fwd.fif" verbose = forward_solution_cfg.get("verbose") overwrite = forward_solution_cfg.get("overwrite", False) self.logger.info(f"Saving forward solution to file: {save_fname}") mne.write_forward_solution(save_fname, fwd, overwrite=overwrite, verbose=verbose) return fwd
[docs] def handle_leadfield(self, fwd: Optional[mne.Forward] = None) -> np.ndarray: """ Load or extract the leadfield matrix. Reads the leadfield matrix from the .npz file specified in `config['leadfield']['fname']` if it exists. Otherwise, extracts the leadfield data from the provided forward solution `fwd`. Reshapes the matrix for free orientation if necessary. Saves the extracted leadfield matrix if `config['leadfield']['save']` is True. Parameters ---------- fwd : mne.Forward, optional The forward solution object. Required if the leadfield matrix is not loaded from a file, by default None. Returns ------- np.ndarray The leadfield matrix. Shape is (n_sensors, n_sources) for fixed orientation or (n_sensors, n_sources, 3) for free orientation. Raises ------ ValueError If the leadfield cannot be loaded from a file and `fwd` is not provided, or if a loaded leadfield file is invalid. """ leadfield_cfg = self.config.get("leadfield", {}) fname = leadfield_cfg.get("fname") if fname: load_path = Path(fname) if load_path.exists(): try: self.logger.debug(f"Loading leadfield matrix from file: {load_path}") with np.load(load_path) as data: if "leadfield" not in data: raise ValueError(f"Invalid .npz file: 'leadfield' key not found in {load_path}") leadfield = data["leadfield"] # Basic validation based on expected dimensions if leadfield.ndim == 2: self.logger.info(f"Loaded fixed orientation leadfield: {leadfield.shape}") elif leadfield.ndim == 3 and leadfield.shape[-1] == 3: self.logger.info(f"Loaded free orientation leadfield: {leadfield.shape}") else: raise ValueError(f"Loaded leadfield has unexpected shape: {leadfield.shape}") leadfield = self._scale_leadfield(leadfield) return leadfield except Exception as e: self.logger.warning(f"Failed to load leadfield matrix from file: {e}. Proceeding to compute it.") else: self.logger.warning(f"Leadfield file does not exist: {load_path}. Proceeding to compute it.") else: self.logger.info("No leadfield file specified. Proceeding to compute it.") if fwd is None: raise ValueError("Forward solution (fwd) must be provided to compute leadfield matrix when not loading from file.") self._store_sensor_metadata_from_info(fwd["info"]) self.logger.info("Extracting leadfield matrix from the forward solution...") leadfield = fwd["sol"]["data"] orientation_type = None if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI: self.logger.info(f"Extracted fixed orientation leadfield: {leadfield.shape}") orientation_type = "fixed" # Shape is already (n_sensors, n_sources) elif fwd["source_ori"] == FIFF.FIFFV_MNE_FREE_ORI: self.logger.info("Extracted free orientation leadfield (raw shape: %s)", leadfield.shape) n_sensors, n_sources_x_orient = leadfield.shape n_orient = 3 if n_sources_x_orient % n_orient != 0: raise ValueError(f"Cannot reshape free orientation leadfield. Shape {leadfield.shape} not divisible by 3.") n_sources = n_sources_x_orient // n_orient leadfield = leadfield.reshape(n_sensors, n_sources, n_orient) self.logger.info(f"Reshaped free orientation leadfield: {leadfield.shape}") orientation_type = "free" else: self.logger.warning("Unknown leadfield orientation type in forward solution.") leadfield = self._scale_leadfield(leadfield) if leadfield_cfg.get("save", False) and orientation_type: save_fname = self.save_path / f"{self.subject}-leadfield-{orientation_type}.npz" self.logger.info(f"Saving computed leadfield matrix to file: {save_fname}") payload = {"leadfield": leadfield} if self.sensor_units is not None: payload["sensor_units"] = self.sensor_units if self.sensor_unitmult is not None: payload["sensor_unitmult"] = self.sensor_unitmult if self.sensor_kind is not None: payload["sensor_kind"] = self.sensor_kind if self.coil_type is not None: payload["coil_type"] = self.coil_type np.savez(save_fname, **payload) return leadfield
[docs] def simulate(self) -> np.ndarray: """ Run the full simulation pipeline. Executes the sequence of handling/creating source space, BEM model, info object, forward solution, and finally the leadfield matrix, based on the provided configuration. Returns ------- np.ndarray The final leadfield matrix. Shape depends on the orientation type. Raises ------ RuntimeError If any step in the pipeline fails. """ self.logger.info("Starting leadfield simulation pipeline...") try: src = self.handle_source_space() bem = self.handle_bem_model() info = self.handle_info() fwd = self.handle_forward_solution(info, src, bem) leadfield = self.handle_leadfield(fwd) except Exception as e: self.logger.error(f"Pipeline failed: {e}", exc_info=True) # Log traceback raise RuntimeError("Simulation pipeline failed.") from e self.logger.info("Leadfield simulation pipeline completed successfully.") return leadfield
[docs] def get_leadfield( self, subject: str = "fsaverage", orientation_type: str = "fixed", retrieve_mode: str = "load", config: Optional[Union[str, Path, Dict]] = None, n_sensors: int = 64, n_sources: int = 1284, return_metadata: bool = False, ) -> Union[np.ndarray, LeadfieldData]: """ Get or generate the leadfield matrix based on the specified mode. Updates self.n_sensors and self.n_sources based on the obtained leadfield. Parameters ---------- subject : str Returns ------- np.ndarray or LeadfieldData The leadfield matrix (L) when ``return_metadata`` is False. When True, returns a ``LeadfieldData`` object that includes both the leadfield matrix and its associated sensor metadata. Raises ------ ValueError If retrieve_mode is invalid, required paths are missing, or loaded/simulated leadfield has unexpected dimensions/format. FileNotFoundError If leadfield_dir does not exist when mode='load'. """ if orientation_type == "fixed": expected_dimensions = 2 # Fixed orientation leads to 2D leadfield (n_sensors, n_sources) expected_suffix = "-fixed.npz" elif orientation_type == "free": expected_suffix = "-free.npz" expected_dimensions = 3 # Free orientation leads to 3D leadfield (n_sensors, n_sources, 3) else: raise ValueError(f"Invalid orientation_type '{orientation_type}'. Choose 'fixed' or 'free'.") metadata = LeadfieldData() if retrieve_mode == "load": # Define the two specific patterns you were trying to match: # Pattern 1: Includes orientation_type in the filename leadfield_path = self.leadfield_dir / f"{subject}_{orientation_type}_leadfield.npz" try: if leadfield_path.exists(): leadfield_path = leadfield_path else: self.logger.warning( f"Leadfield file not found for subject '{subject}' with orientation '{orientation_type}' " f"in directory '{self.leadfield_dir}'.\n" f"Checked specific patterns:\n" f" - {leadfield_path}" ) raise FileNotFoundError( f"Leadfield file not found for subject '{subject}' in directory '{self.leadfield_dir}'." ) self.logger.debug(f"Loading leadfield matrix from file: {leadfield_path}") with np.load(leadfield_path, allow_pickle=True) as data: if "leadfield" not in data and "leadfield" not in data: raise ValueError(f"File {leadfield_path} does not contain 'leadfield' or 'leadfield' key.") leadfield = data["leadfield"] if "leadfield" in data else data["leadfield"] metadata = LeadfieldData( sensor_kind=data.get("sensor_kind"), sensor_units=data.get("sensor_units", data.get("units")), sensor_unitmult=data.get("sensor_unitmult"), coil_type=data.get("coil_type"), src_coords=data.get("src_coords"), Q_basis=data.get("Q_basis"), ) leadfield = self._scale_leadfield(leadfield, metadata=metadata) # if orientation_type == "free": # leadfield = self._reshape_free_orientation(leadfield) if leadfield.ndim != expected_dimensions: raise ValueError( f"Loaded leadfield matrix dimension mismatch for orientation '{orientation_type}': " f"expected {expected_dimensions} dimensions, but got {leadfield.ndim}." ) self.logger.debug(f"Leadfield loaded with shape {leadfield.shape}") except (FileNotFoundError, ValueError) as e: self.logger.error(f"Failed to load leadfield matrix: {e}") raise elif retrieve_mode == "simulate": self.logger.debug(f"Simulating leadfield matrix using LeadfieldBuilder with config: {config}") try: if config is not None: if isinstance(config, (str, Path)): config = load_config(Path(config)) L_simulator = LeadfieldBuilder( config=config, leadfield_dir=self.leadfield_dir, logger=self.logger, ) else: L_simulator = self leadfield = L_simulator.simulate() self.logger.info(f"Simulated leadfield matrix with shape {leadfield.shape}") metadata = LeadfieldData( sensor_units=L_simulator.sensor_units, sensor_unitmult=L_simulator.sensor_unitmult, sensor_kind=L_simulator.sensor_kind, coil_type=L_simulator.coil_type, src_coords=getattr(L_simulator, "src_coords", None), Q_basis=getattr(L_simulator, "Q_basis", None), ) if orientation_type == "free": leadfield = self._reshape_free_orientation(leadfield) if leadfield.ndim != expected_dimensions: raise ValueError( f"Simulated leadfield matrix dimension mismatch for orientation '{orientation_type}': " f"expected {expected_dimensions} dimensions, but got {leadfield.ndim}." ) # TODO: Handle channel units based on L_simulator.channel_type # if L_simulator.channel_type == "eeg": # self.sensor_units = [FIFF.FIFF_UNIT_V] * leadfield.shape[0] # elif L_simulator.channel_type == "meg": # self.sensor_units = [FIFF.FIFF_UNIT_T] * leadfield.shape[0] # elif L_simulator.channel_type == "grad": # self.sensor_units = [FIFF.FIFF_UNIT_T_M] * leadfield.shape[0] # elif not L_simulator.channel_type: # self.sensor_units = [FIFF.FIFF_UNIT_UNKNOWN] * leadfield.shape[0] # self.logger.warning("Channel type not specified. Defaulting to 'unknown'.") # else: # raise ValueError(f"Invalid channel_type '{L_simulator.channel_type}'. Choose 'eeg', 'meg', or 'grad'.") except Exception as e: self.logger.error(f"Failed to simulate leadfield matrix: {e}") raise elif retrieve_mode == "random": self.logger.info(f"Generating a random leadfield matrix (n_sensors={n_sensors}, n_sources={n_sources}).") rng = check_random_state(64) if orientation_type == "fixed": leadfield = rng.standard_normal((n_sensors, n_sources)) else: leadfield = rng.standard_normal((n_sensors, n_sources, 3)) self.logger.info(f"Random leadfield generated with shape {leadfield.shape}") # TODO: Handle channel units based on self.channel_type # if self.channel_type == "eeg": # self.sensor_units = [FIFF.FIFF_UNIT_V] * leadfield.shape[0] # elif self.channel_type == "meg": # self.sensor_units = [FIFF.FIFF_UNIT_T] * leadfield.shape[0] # elif self.channel_type == "grad": # self.sensor_units = [FIFF.FIFF_UNIT_T_M] * leadfield.shape[0] # else: # raise ValueError(f"Invalid channel_type '{self.channel_type}'. Choose 'eeg', 'meg', or 'grad'.") else: raise ValueError(f"Invalid leadfield mode '{retrieve_mode}'. Options are 'load', 'simulate', or 'random'.") # Update n_sensors and n_sources based on the actual leadfield dimensions if leadfield.ndim == 2 == expected_dimensions: # Fixed n_sensors, n_sources = leadfield.shape elif leadfield.ndim == 3 == expected_dimensions: # Free n_sensors, n_sources, _ = leadfield.shape self.logger.debug(f"Leadfield obtained. Updated n_sensors={n_sensors}, n_sources={n_sources}") metadata.leadfield = leadfield metadata.src_coords = metadata.src_coords self._set_sensor_metadata( kind=metadata.sensor_kind, unit=metadata.sensor_units, unit_mul=metadata.sensor_unitmult, coil_type=metadata.coil_type, ) if return_metadata: return metadata return leadfield