"""
Module for simulating leadfield matrices using MNE-Python.
Provides a configurable framework for setting up and running a leadfield
simulation pipeline. Handles various MNE-Python components, such as source
space, BEM model, montage, info object, forward solution, and leadfield matrix,
allowing loading from files or creating them based on configuration.
"""
import logging
from dataclasses import dataclass
import numpy as np
from pathlib import Path
from mne.io.constants import FIFF
import mne
from typing import Dict, Optional, Union
from calibrain.utils import load_config
from sklearn.utils import check_random_state
@dataclass
class LeadfieldData:
leadfield: Optional[np.ndarray] = None
sensor_kind: Optional[int] = None
sensor_units: Optional[int] = None
sensor_unitmult: Optional[int] = None
coil_type: Optional[int] = None
src_coords: Optional[np.ndarray] = None
Q_basis: Optional[np.ndarray] = None
[docs]
class LeadfieldBuilder:
"""
Simulates leadfield matrices based on a configuration dictionary.
Handles the creation or loading of necessary MNE-Python components like
source space, BEM model, montage, info, and forward solution.
Attributes
----------
config : dict
The configuration dictionary driving the simulation process.
self.logger : logging.Logger
Logger instance for logging messages.
data_path : Path
Path to the data directory.
subject : str
Subject identifier.
subjects_dir : Path
Path to the FreeSurfer subjects directory.
save_path : Path
Base path where generated files (source space, BEM, etc.) will be saved.
"""
[docs]
def __init__(self, config: Dict = None, leadfield_dir: str = None, logger: Optional[logging.Logger] = None):
"""
Initialize the LeadfieldBuilder.
Parameters
----------
config : dict
Configuration dictionary containing paths and parameters for
each simulation step (data, source_space, bem_model, montage,
info, forward_solution, leadfield).
channel_type : str, optional
Type of channels to simulate ('eeg', 'meg', or 'grad'). If None,
all channel types will be simulated. Default is None.
leadfield_dir : str, optional
Directory where leadfield matrices are stored or will be saved.
self.logger : logging.Logger, optional
Custom self.logger instance. If None, a default self.logger is created,
by default None.
Raises
------
FileNotFoundError
If `data_path` or `subjects_dir` specified in the config do not exist.
"""
self.config = config
if self.config:
data_cfg = config["data"]
self.subject = data_cfg["subject"]
self.data_path = Path(data_cfg["data_path"])
self.subjects_dir = Path(data_cfg["subjects_dir"])
self.save_path = Path(data_cfg["save_path"])
if not self.data_path.exists():
raise FileNotFoundError(f"Data path does not exist: {self.data_path}")
if not self.subjects_dir.exists():
raise FileNotFoundError(f"Subjects directory does not exist: {self.subjects_dir}")
if not self.save_path.exists():
self.logger.info(f"Save path does not exist. Creating: {self.save_path}")
self.save_path.mkdir(parents=True, exist_ok=True)
self.logger = logger if logger else logging.getLogger(__name__)
self.leadfield_dir = Path(leadfield_dir)
# Parameters describing the simulated/loaded sensors
self.sensor_units: Optional[int] = None
self.sensor_unitmult: Optional[int] = None
self.sensor_kind: Optional[int] = None
self.coil_type: Optional[int] = None
def __getstate__(self):
state = self.__dict__.copy()
state['logger'] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
if self.logger is None:
self.logger = logging.getLogger(self.__class__.__name__)
def _set_sensor_metadata(
self,
*,
kind: Optional[Union[int, np.integer]] = None,
unit: Optional[Union[int, np.integer]] = None,
unit_mul: Optional[Union[int, np.integer]] = None,
coil_type: Optional[Union[int, np.integer]] = None,
) -> None:
"""Assign sensor metadata, coercing numpy scalars to ints."""
if kind is not None:
self.sensor_kind = int(kind) # 1 for FIFFV_MEG_CH, 2 for FIFFV_EEG_CH
if unit is not None:
self.sensor_units = int(unit)
if unit_mul is not None:
self.sensor_unitmult = int(unit_mul)
if coil_type is not None:
self.coil_type = int(coil_type)
def _scale_leadfield(
self,
leadfield: np.ndarray,
metadata: Optional[LeadfieldData] = None,
) -> np.ndarray:
"""Scale leadfield to fT/nAm or µV/nAm for better readability."""
sensor_units = metadata.sensor_units if metadata else self.sensor_units
sensor_unitmult = metadata.sensor_unitmult if metadata else self.sensor_unitmult
if sensor_units is None:
return leadfield
target_mul = None
scaler = None
if sensor_units in (FIFF.FIFF_UNIT_T, FIFF.FIFF_UNIT_T_M):
target_mul = FIFF.FIFF_UNITM_F # femto
# Convert data from T / (Am) to fT / (nAm):
# 1 T = 1e15 fT and 1 Am = 1e9 nAm, so the combined scaling factor is 1e6
scaler = 10 ** (-FIFF.FIFF_UNITM_F + FIFF.FIFF_UNITM_N)
elif sensor_units == FIFF.FIFF_UNIT_V:
target_mul = FIFF.FIFF_UNITM_MU # micro (10^-6)
# Convert from V / (nAm) to µV / (Am):
# 1 V = 1e6 µV and 1 nAm = 1e-9 Am -> combined scaler = 1e-3
scaler = 10 ** (-FIFF.FIFF_UNITM_MU + FIFF.FIFF_UNITM_N)
if target_mul is None or scaler is None:
return leadfield
if sensor_unitmult == target_mul:
return leadfield
if sensor_unitmult not in (None, FIFF.FIFF_UNITM_NONE):
return leadfield
leadfield = leadfield * scaler
if metadata is not None:
metadata.sensor_unitmult = target_mul
else:
self.sensor_unitmult = target_mul
return leadfield
def _reshape_free_orientation(self, leadfield: np.ndarray) -> np.ndarray:
"""Convert (n_sensors, 3 * n_sources) -> (n_sensors, n_sources, 3)."""
if leadfield.ndim != 2:
return leadfield
n_sensors, n_components = leadfield.shape
if n_components % 3 != 0:
raise ValueError(
f"Free-orientation leadfield has shape {leadfield.shape}; "
"the second dimension must be divisible by 3."
)
n_sources = n_components // 3
reshaped = leadfield.reshape(n_sensors, n_sources, 3)
self.logger.debug(
"Converted free-orientation leadfield from shape %s to %s",
leadfield.shape,
reshaped.shape,
)
return reshaped
def _store_sensor_metadata_from_info(self, info: Optional[mne.Info]) -> None:
"""Capture sensor metadata (kind/unit/unit_mul) from an info object."""
if info is None or "chs" not in info or not info["chs"]:
return
chs = info["chs"]
first = chs[0]
kind = first.get("kind")
unit = first.get("unit")
unit_mul = first.get("unit_mul")
coil_type = first.get("coil_type")
self._set_sensor_metadata(
kind=kind, unit=unit, unit_mul=unit_mul, coil_type=coil_type
)
[docs]
def handle_source_space(self) -> mne.SourceSpaces:
"""
Load or create the source space.
Reads the source space from the file specified in `config['source_space']['fname']`
if it exists. Otherwise, creates a new source space using
`mne.setup_source_space` with parameters from the configuration.
Saves the created source space if `config['source_space']['save']` is True.
Returns
-------
mne.SourceSpaces
The loaded or created source space object.
"""
source_space_cfg = self.config["source_space"]
fname = source_space_cfg.get("fname")
if fname:
load_path = Path(fname)
if load_path.exists():
self.logger.info(f"Loading source space from file: {load_path}")
return mne.read_source_spaces(load_path)
else:
self.logger.warning(f"Source space file does not exist: {load_path}. Creating from scratch...")
else:
self.logger.info("No source space file specified. Creating from scratch...")
valid_kwargs = ["spacing", "surface", "add_dist", "n_jobs", "verbose"]
kwargs = {key: value for key, value in source_space_cfg.items() if key in valid_kwargs}
src = mne.setup_source_space(subject=self.subject, subjects_dir=self.subjects_dir, **kwargs)
if source_space_cfg.get("save", False):
save_fname = self.save_path / f"{self.subject}-src.fif"
verbose = source_space_cfg.get("verbose") # Let MNE handle default if None
overwrite = source_space_cfg.get("overwrite", False) # Default to False
self.logger.info(f"Saving source space to file: {save_fname}")
mne.write_source_spaces(save_fname, src, overwrite=overwrite, verbose=verbose)
return src
[docs]
def handle_bem_model(self) -> mne.bem.ConductorModel:
"""
Load or create the BEM (Boundary Element Model) solution.
Reads the BEM solution from the file specified in `config['bem_model']['fname']`
if it exists. Otherwise, creates a new BEM model using `mne.make_bem_model`
and computes the solution using `mne.make_bem_solution` with parameters
from the configuration. Saves the created BEM solution if
`config['bem_model']['save']` is True.
Returns
-------
mne.bem.ConductorModel
The loaded or created BEM solution object.
"""
bem_model_cfg = self.config["bem_model"]
fname = bem_model_cfg.get("fname")
if fname:
load_path = Path(fname)
if load_path.exists():
self.logger.info(f"Loading BEM model from file: {load_path}")
return mne.read_bem_solution(load_path)
else:
self.logger.warning(f"BEM model file does not exist: {load_path}. Creating from scratch...")
else:
self.logger.info("No BEM model file specified. Creating from scratch...")
valid_model_kwargs = ["ico", "conductivity", "verbose"]
model_kwargs = {key: value for key, value in bem_model_cfg.items() if key in valid_model_kwargs}
bem_model = mne.make_bem_model(subject=self.subject, subjects_dir=self.subjects_dir, **model_kwargs)
valid_sol_kwargs = ["solver", "verbose"]
sol_kwargs = {key: value for key, value in bem_model_cfg.items() if key in valid_sol_kwargs}
bem = mne.make_bem_solution(surfs=bem_model, **sol_kwargs)
if bem_model_cfg.get("save", False):
save_fname = self.save_path / f"{self.subject}-bem.fif"
verbose = bem_model_cfg.get("verbose")
overwrite = bem_model_cfg.get("overwrite", False)
self.logger.info(f"Saving BEM model to file: {save_fname}")
mne.write_bem_solution(save_fname, bem, overwrite=overwrite, verbose=verbose)
return bem
[docs]
def handle_montage(self) -> mne.channels.DigMontage:
"""
Load or create the sensor montage.
Reads the montage from the file specified in `config['montage']['fname']`
if it exists. Otherwise, creates a standard montage using
`mne.channels.make_standard_montage` with parameters from the configuration.
Saves the created montage if `config['montage']['save']` is True.
Returns
-------
mne.channels.DigMontage
The loaded or created montage object.
"""
montage_cfg = self.config.get("montage", {})
fname = montage_cfg.get("fname")
if fname:
load_path = Path(fname)
if load_path.exists():
self.logger.info(f"Loading montage from file: {load_path}")
# Assuming FIF format based on save method
return mne.channels.read_dig_fif(load_path)
else:
self.logger.warning(f"Montage file does not exist: {load_path}. Creating from scratch...")
else:
self.logger.info("No montage file specified. Creating from scratch...")
valid_kwargs = ["kind", "head_size"] # Parameters for make_standard_montage
kwargs = {key: value for key, value in montage_cfg.items() if key in valid_kwargs}
if not kwargs.get("kind"):
self.logger.warning("Montage 'kind' not specified in config, cannot create standard montage. Returning None.")
return None # Cannot create without kind
montage = mne.channels.make_standard_montage(**kwargs)
if montage_cfg.get("save", False):
save_fname = self.save_path / f"{self.subject}-montage.fif"
verbose = montage_cfg.get("verbose")
overwrite = montage_cfg.get("overwrite", False)
self.logger.info(f"Saving montage to file: {save_fname}")
montage.save(save_fname, overwrite=overwrite, verbose=verbose)
return montage
[docs]
def handle_info(self) -> mne.Info:
"""
Load or create the MNE measurement info object.
Reads the info from the file specified in `config['info']['fname']`
if it exists. Otherwise, creates a new info object using `mne.create_info`
with parameters from the configuration, associating it with the montage
obtained via `handle_montage`. Saves the created info object if
`config['info']['save']` is True.
Returns
-------
mne.Info
The loaded or created info object.
Raises
------
ValueError
If info cannot be created because the montage is missing or invalid.
"""
info_cfg = self.config["info"]
fname = info_cfg.get("fname")
if fname:
load_path = Path(fname)
if load_path.exists():
self.logger.info(f"Loading info object from file: {load_path}")
info = mne.io.read_info(load_path)
self._store_sensor_metadata_from_info(info)
return info
else:
self.logger.warning(f"Info file does not exist: {load_path}. Creating from scratch...")
else:
self.logger.info("No info file specified. Creating from scratch...")
montage = self.handle_montage()
if montage is None or not hasattr(montage, 'ch_names'):
raise ValueError("Cannot create info object without a valid montage.")
valid_kwargs = ["sfreq", "ch_types", "verbose"]
kwargs = {key: value for key, value in info_cfg.items() if key in valid_kwargs}
if not kwargs.get("sfreq") or not kwargs.get("ch_types"):
raise ValueError("Cannot create info object without 'sfreq' and 'ch_types' in config.")
info = mne.create_info(
ch_names=montage.ch_names,
**kwargs
)
info.set_montage(montage)
self._store_sensor_metadata_from_info(info)
if info_cfg.get("save", False):
save_fname = self.save_path / f"{self.subject}-info.fif"
# write_info doesn't have overwrite, it overwrites by default
self.logger.info(f"Saving info object to file: {save_fname}")
mne.io.write_info(save_fname, info)
return info
[docs]
def handle_forward_solution(self, info: mne.Info, src: mne.SourceSpaces, bem: mne.bem.ConductorModel) -> mne.Forward:
"""
Load or create the forward solution.
Reads the forward solution from the file specified in
`config['forward_solution']['fname']` if it exists and has a valid suffix.
Otherwise, creates a new forward solution using `mne.make_forward_solution`
with parameters from the configuration. Converts the solution to fixed
orientation if `config['forward_solution']['orientation_type']` is 'fixed'.
Saves the created forward solution if `config['forward_solution']['save']` is True.
Parameters
----------
info : mne.Info
The measurement info object.
src : mne.SourceSpaces
The source space object.
bem : mne.bem.ConductorModel
The BEM solution object.
Returns
-------
mne.Forward
The loaded or created forward solution object.
"""
forward_solution_cfg = self.config.get("forward_solution", {})
orientation_type = forward_solution_cfg.get("orientation_type", "free") # Default to free
fname = forward_solution_cfg.get("fname")
if fname:
load_path = Path(fname)
if load_path.exists():
# Basic check for expected suffix
if not (load_path.name.endswith("fixed-fwd.fif") or load_path.name.endswith("free-fwd.fif")):
self.logger.warning(f"File {load_path} does not have a standard forward solution suffix. Attempting to load anyway.")
try:
self.logger.info(f"Loading forward solution from file: {load_path}")
fwd = mne.read_forward_solution(load_path)
self._store_sensor_metadata_from_info(fwd["info"])
# Optional: Check if loaded orientation matches config
loaded_ori = "fixed" if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI else "free"
if loaded_ori != orientation_type:
self.logger.warning(f"Loaded forward solution orientation ('{loaded_ori}') does not match config ('{orientation_type}'). Using loaded orientation.")
orientation_type = loaded_ori # Update based on loaded file
return fwd
except Exception as e:
self.logger.warning(f"Failed to load forward solution from file: {e}. Proceeding to create it.")
else:
self.logger.warning(f"Forward solution file does not exist: {load_path}. Proceeding to create it.")
else:
self.logger.info("No forward solution file specified. Proceeding to create it.")
self.logger.info("Creating forward solution...")
valid_kwargs = ["trans", "eeg", "meg", "mindist", "ignore_ref", "n_jobs", "verbose"]
kwargs = {key: value for key, value in forward_solution_cfg.items() if key in valid_kwargs}
# Ensure trans is provided if needed
if 'trans' not in kwargs or not kwargs['trans']:
self.logger.warning("Transformation file ('trans') not specified in forward_solution config. Using default 'fsaverage-trans.fif'. This might be incorrect.")
kwargs['trans'] = 'fsaverage-trans.fif' # Or handle error differently
fwd = mne.make_forward_solution(
info=info,
src=src,
bem=bem,
**kwargs
)
self.logger.info("Forward solution created successfully.")
self._store_sensor_metadata_from_info(fwd["info"])
if orientation_type == "fixed":
surf_ori = forward_solution_cfg.get("surf_ori", True)
force_fixed = forward_solution_cfg.get("force_fixed", True)
self.logger.info(f"Converting forward solution to fixed orientation (surf_ori={surf_ori}, force_fixed={force_fixed})...")
fwd = mne.convert_forward_solution(fwd, force_fixed=force_fixed, surf_ori=surf_ori)
self.logger.info("Orientation fixed successfully.")
elif fwd["source_ori"] != FIFF.FIFFV_MNE_FREE_ORI:
# If config asked for free but make_forward_solution didn't yield it (unlikely)
self.logger.warning("Expected free orientation but forward solution is not. Check configuration.")
if forward_solution_cfg.get("save", False):
# Determine final orientation type for saving filename
final_orientation = "fixed" if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI else "free"
save_fname = self.save_path / f"{self.subject}-{final_orientation}-fwd.fif"
verbose = forward_solution_cfg.get("verbose")
overwrite = forward_solution_cfg.get("overwrite", False)
self.logger.info(f"Saving forward solution to file: {save_fname}")
mne.write_forward_solution(save_fname, fwd, overwrite=overwrite, verbose=verbose)
return fwd
[docs]
def handle_leadfield(self, fwd: Optional[mne.Forward] = None) -> np.ndarray:
"""
Load or extract the leadfield matrix.
Reads the leadfield matrix from the .npz file specified in
`config['leadfield']['fname']` if it exists. Otherwise, extracts the
leadfield data from the provided forward solution `fwd`. Reshapes the
matrix for free orientation if necessary. Saves the extracted leadfield
matrix if `config['leadfield']['save']` is True.
Parameters
----------
fwd : mne.Forward, optional
The forward solution object. Required if the leadfield matrix is not
loaded from a file, by default None.
Returns
-------
np.ndarray
The leadfield matrix. Shape is (n_sensors, n_sources) for fixed
orientation or (n_sensors, n_sources, 3) for free orientation.
Raises
------
ValueError
If the leadfield cannot be loaded from a file and `fwd` is not provided,
or if a loaded leadfield file is invalid.
"""
leadfield_cfg = self.config.get("leadfield", {})
fname = leadfield_cfg.get("fname")
if fname:
load_path = Path(fname)
if load_path.exists():
try:
self.logger.debug(f"Loading leadfield matrix from file: {load_path}")
with np.load(load_path) as data:
if "leadfield" not in data:
raise ValueError(f"Invalid .npz file: 'leadfield' key not found in {load_path}")
leadfield = data["leadfield"]
# Basic validation based on expected dimensions
if leadfield.ndim == 2:
self.logger.info(f"Loaded fixed orientation leadfield: {leadfield.shape}")
elif leadfield.ndim == 3 and leadfield.shape[-1] == 3:
self.logger.info(f"Loaded free orientation leadfield: {leadfield.shape}")
else:
raise ValueError(f"Loaded leadfield has unexpected shape: {leadfield.shape}")
leadfield = self._scale_leadfield(leadfield)
return leadfield
except Exception as e:
self.logger.warning(f"Failed to load leadfield matrix from file: {e}. Proceeding to compute it.")
else:
self.logger.warning(f"Leadfield file does not exist: {load_path}. Proceeding to compute it.")
else:
self.logger.info("No leadfield file specified. Proceeding to compute it.")
if fwd is None:
raise ValueError("Forward solution (fwd) must be provided to compute leadfield matrix when not loading from file.")
self._store_sensor_metadata_from_info(fwd["info"])
self.logger.info("Extracting leadfield matrix from the forward solution...")
leadfield = fwd["sol"]["data"]
orientation_type = None
if fwd["source_ori"] == FIFF.FIFFV_MNE_FIXED_ORI:
self.logger.info(f"Extracted fixed orientation leadfield: {leadfield.shape}")
orientation_type = "fixed"
# Shape is already (n_sensors, n_sources)
elif fwd["source_ori"] == FIFF.FIFFV_MNE_FREE_ORI:
self.logger.info("Extracted free orientation leadfield (raw shape: %s)", leadfield.shape)
n_sensors, n_sources_x_orient = leadfield.shape
n_orient = 3
if n_sources_x_orient % n_orient != 0:
raise ValueError(f"Cannot reshape free orientation leadfield. Shape {leadfield.shape} not divisible by 3.")
n_sources = n_sources_x_orient // n_orient
leadfield = leadfield.reshape(n_sensors, n_sources, n_orient)
self.logger.info(f"Reshaped free orientation leadfield: {leadfield.shape}")
orientation_type = "free"
else:
self.logger.warning("Unknown leadfield orientation type in forward solution.")
leadfield = self._scale_leadfield(leadfield)
if leadfield_cfg.get("save", False) and orientation_type:
save_fname = self.save_path / f"{self.subject}-leadfield-{orientation_type}.npz"
self.logger.info(f"Saving computed leadfield matrix to file: {save_fname}")
payload = {"leadfield": leadfield}
if self.sensor_units is not None:
payload["sensor_units"] = self.sensor_units
if self.sensor_unitmult is not None:
payload["sensor_unitmult"] = self.sensor_unitmult
if self.sensor_kind is not None:
payload["sensor_kind"] = self.sensor_kind
if self.coil_type is not None:
payload["coil_type"] = self.coil_type
np.savez(save_fname, **payload)
return leadfield
[docs]
def simulate(self) -> np.ndarray:
"""
Run the full simulation pipeline.
Executes the sequence of handling/creating source space, BEM model,
info object, forward solution, and finally the leadfield matrix,
based on the provided configuration.
Returns
-------
np.ndarray
The final leadfield matrix. Shape depends on the orientation type.
Raises
------
RuntimeError
If any step in the pipeline fails.
"""
self.logger.info("Starting leadfield simulation pipeline...")
try:
src = self.handle_source_space()
bem = self.handle_bem_model()
info = self.handle_info()
fwd = self.handle_forward_solution(info, src, bem)
leadfield = self.handle_leadfield(fwd)
except Exception as e:
self.logger.error(f"Pipeline failed: {e}", exc_info=True) # Log traceback
raise RuntimeError("Simulation pipeline failed.") from e
self.logger.info("Leadfield simulation pipeline completed successfully.")
return leadfield
[docs]
def get_leadfield(
self,
subject: str = "fsaverage",
orientation_type: str = "fixed",
retrieve_mode: str = "load",
config: Optional[Union[str, Path, Dict]] = None,
n_sensors: int = 64,
n_sources: int = 1284,
return_metadata: bool = False,
) -> Union[np.ndarray, LeadfieldData]:
"""
Get or generate the leadfield matrix based on the specified mode.
Updates self.n_sensors and self.n_sources based on the obtained leadfield.
Parameters
----------
subject : str
Returns
-------
np.ndarray or LeadfieldData
The leadfield matrix (L) when ``return_metadata`` is False.
When True, returns a ``LeadfieldData`` object that includes both the
leadfield matrix and its associated sensor metadata.
Raises
------
ValueError
If retrieve_mode is invalid, required paths are missing,
or loaded/simulated leadfield has unexpected dimensions/format.
FileNotFoundError
If leadfield_dir does not exist when mode='load'.
"""
if orientation_type == "fixed":
expected_dimensions = 2 # Fixed orientation leads to 2D leadfield (n_sensors, n_sources)
expected_suffix = "-fixed.npz"
elif orientation_type == "free":
expected_suffix = "-free.npz"
expected_dimensions = 3 # Free orientation leads to 3D leadfield (n_sensors, n_sources, 3)
else:
raise ValueError(f"Invalid orientation_type '{orientation_type}'. Choose 'fixed' or 'free'.")
metadata = LeadfieldData()
if retrieve_mode == "load":
# Define the two specific patterns you were trying to match:
# Pattern 1: Includes orientation_type in the filename
leadfield_path = self.leadfield_dir / f"{subject}_{orientation_type}_leadfield.npz"
try:
if leadfield_path.exists():
leadfield_path = leadfield_path
else:
self.logger.warning(
f"Leadfield file not found for subject '{subject}' with orientation '{orientation_type}' "
f"in directory '{self.leadfield_dir}'.\n"
f"Checked specific patterns:\n"
f" - {leadfield_path}"
)
raise FileNotFoundError(
f"Leadfield file not found for subject '{subject}' in directory '{self.leadfield_dir}'."
)
self.logger.debug(f"Loading leadfield matrix from file: {leadfield_path}")
with np.load(leadfield_path, allow_pickle=True) as data:
if "leadfield" not in data and "leadfield" not in data:
raise ValueError(f"File {leadfield_path} does not contain 'leadfield' or 'leadfield' key.")
leadfield = data["leadfield"] if "leadfield" in data else data["leadfield"]
metadata = LeadfieldData(
sensor_kind=data.get("sensor_kind"),
sensor_units=data.get("sensor_units", data.get("units")),
sensor_unitmult=data.get("sensor_unitmult"),
coil_type=data.get("coil_type"),
src_coords=data.get("src_coords"),
Q_basis=data.get("Q_basis"),
)
leadfield = self._scale_leadfield(leadfield, metadata=metadata)
# if orientation_type == "free":
# leadfield = self._reshape_free_orientation(leadfield)
if leadfield.ndim != expected_dimensions:
raise ValueError(
f"Loaded leadfield matrix dimension mismatch for orientation '{orientation_type}': "
f"expected {expected_dimensions} dimensions, but got {leadfield.ndim}."
)
self.logger.debug(f"Leadfield loaded with shape {leadfield.shape}")
except (FileNotFoundError, ValueError) as e:
self.logger.error(f"Failed to load leadfield matrix: {e}")
raise
elif retrieve_mode == "simulate":
self.logger.debug(f"Simulating leadfield matrix using LeadfieldBuilder with config: {config}")
try:
if config is not None:
if isinstance(config, (str, Path)):
config = load_config(Path(config))
L_simulator = LeadfieldBuilder(
config=config,
leadfield_dir=self.leadfield_dir,
logger=self.logger,
)
else:
L_simulator = self
leadfield = L_simulator.simulate()
self.logger.info(f"Simulated leadfield matrix with shape {leadfield.shape}")
metadata = LeadfieldData(
sensor_units=L_simulator.sensor_units,
sensor_unitmult=L_simulator.sensor_unitmult,
sensor_kind=L_simulator.sensor_kind,
coil_type=L_simulator.coil_type,
src_coords=getattr(L_simulator, "src_coords", None),
Q_basis=getattr(L_simulator, "Q_basis", None),
)
if orientation_type == "free":
leadfield = self._reshape_free_orientation(leadfield)
if leadfield.ndim != expected_dimensions:
raise ValueError(
f"Simulated leadfield matrix dimension mismatch for orientation '{orientation_type}': "
f"expected {expected_dimensions} dimensions, but got {leadfield.ndim}."
)
# TODO: Handle channel units based on L_simulator.channel_type
# if L_simulator.channel_type == "eeg":
# self.sensor_units = [FIFF.FIFF_UNIT_V] * leadfield.shape[0]
# elif L_simulator.channel_type == "meg":
# self.sensor_units = [FIFF.FIFF_UNIT_T] * leadfield.shape[0]
# elif L_simulator.channel_type == "grad":
# self.sensor_units = [FIFF.FIFF_UNIT_T_M] * leadfield.shape[0]
# elif not L_simulator.channel_type:
# self.sensor_units = [FIFF.FIFF_UNIT_UNKNOWN] * leadfield.shape[0]
# self.logger.warning("Channel type not specified. Defaulting to 'unknown'.")
# else:
# raise ValueError(f"Invalid channel_type '{L_simulator.channel_type}'. Choose 'eeg', 'meg', or 'grad'.")
except Exception as e:
self.logger.error(f"Failed to simulate leadfield matrix: {e}")
raise
elif retrieve_mode == "random":
self.logger.info(f"Generating a random leadfield matrix (n_sensors={n_sensors}, n_sources={n_sources}).")
rng = check_random_state(64)
if orientation_type == "fixed":
leadfield = rng.standard_normal((n_sensors, n_sources))
else:
leadfield = rng.standard_normal((n_sensors, n_sources, 3))
self.logger.info(f"Random leadfield generated with shape {leadfield.shape}")
# TODO: Handle channel units based on self.channel_type
# if self.channel_type == "eeg":
# self.sensor_units = [FIFF.FIFF_UNIT_V] * leadfield.shape[0]
# elif self.channel_type == "meg":
# self.sensor_units = [FIFF.FIFF_UNIT_T] * leadfield.shape[0]
# elif self.channel_type == "grad":
# self.sensor_units = [FIFF.FIFF_UNIT_T_M] * leadfield.shape[0]
# else:
# raise ValueError(f"Invalid channel_type '{self.channel_type}'. Choose 'eeg', 'meg', or 'grad'.")
else:
raise ValueError(f"Invalid leadfield mode '{retrieve_mode}'. Options are 'load', 'simulate', or 'random'.")
# Update n_sensors and n_sources based on the actual leadfield dimensions
if leadfield.ndim == 2 == expected_dimensions: # Fixed
n_sensors, n_sources = leadfield.shape
elif leadfield.ndim == 3 == expected_dimensions: # Free
n_sensors, n_sources, _ = leadfield.shape
self.logger.debug(f"Leadfield obtained. Updated n_sensors={n_sensors}, n_sources={n_sources}")
metadata.leadfield = leadfield
metadata.src_coords = metadata.src_coords
self._set_sensor_metadata(
kind=metadata.sensor_kind,
unit=metadata.sensor_units,
unit_mul=metadata.sensor_unitmult,
coil_type=metadata.coil_type,
)
if return_metadata:
return metadata
return leadfield