Source code for calibrain.utils

from typing import Any, Dict, Union, Optional
import logging
from pathlib import Path
import os
import runpy

import mne
import numpy as np
import yaml


[docs] def load_config(config_file: str, logger=None) -> dict: """ Load the configuration from a YAML file. Parameters ---------- config_file : str Path to the YAML configuration file. logger : logging.Logger, optional Logger instance for logging messages. If None, a default logger will be created. Raises ------ FileNotFoundError If the configuration file is not found. yaml.YAMLError If there is an error parsing the YAML file. ValueError If the configuration file is empty or invalid. Returns ------- config : dict The loaded configuration as a dictionary. """ if not config_file.exists(): raise FileNotFoundError(f"Configuration file does not exist: {config_file}") logger = logger if logger else logging.getLogger(__name__) logger.info(f"Loading configuration from file: {config_file}") try: with open(config_file, "r") as f: config = yaml.safe_load(f) if not isinstance(config, dict): raise TypeError("The configuration file must contain a dictionary at the top level.") if not config: raise ValueError("The configuration file is empty or invalid.") logger.info("Configuration successfully loaded.") return config except FileNotFoundError: logger.error(f"Configuration file not found: {config_file}") raise except yaml.YAMLError as e: logger.error(f"Error parsing YAML file: {e}") raise
[docs] def load_python_config(config_path: Path | str) -> Dict[str, Any]: """Execute a Python config file and return its CONFIG dict.""" namespace = runpy.run_path(str(config_path)) if "CONFIG" not in namespace: raise ValueError(f"Config {config_path} must define a CONFIG dict.") return namespace["CONFIG"]
def save_subjects_mne_info(subjects=["CC120166", "CC120264", "CC120309", "CC120313"], fwd_dir='examples/BSI-ZOO_forward_data'): for subject in subjects: fwd_file = f'{fwd_dir}/{subject}-fwd.fif' print(f'Processing forward solution for subject: {subject}') # Load the forward solution fwd = mne.read_forward_solution(fwd_file) # Extract the info object info = fwd['info'] # Add artificial empty events to avoid KeyError if "events" not in info: info["events"] = [] # Save the info object to a file info_file = f'{fwd_dir}/{subject}-info.fif' mne.io.write_info(info_file, info) def inspect_object(obj, show_private=False): """ Print attributes and methods of a Python object separately. Parameters ---------- obj The object to inspect. show_private : bool If True, include private attributes/methods (starting with '_'). Returns ------- dict Dictionary with 'attributes' and 'methods' keys. """ def is_valid(name): return show_private or not name.startswith("_") attributes = [a for a in dir(obj) if not callable(getattr(obj, a)) and is_valid(a)] methods = [m for m in dir(obj) if callable(getattr(obj, m)) and is_valid(m)] print("Attributes:") for attr in attributes: print(f" - {attr}") print("\nMethods:") for method in methods: print(f" - {method}") return {"attributes": attributes, "methods": methods}
[docs] def get_data_path(path: Optional[Union[str, Path]] = None) -> Path: """Get the path to CaliBrain data directory. Parameters ---------- path : str | Path | None Custom path to data directory. If None, uses default location. Returns ------- data_path : Path Path to CaliBrain data directory. """ if path is None: # Use environment variable or default location in calibrain package if 'CALIBRAIN_DATA' in os.environ: data_path = Path(os.environ['CALIBRAIN_DATA']) else: # Default to calibrain/data directory relative to this module calibrain_root = Path(__file__).parent.parent data_path = calibrain_root / 'data' data_path = Path(data_path) else: data_path = Path(path) # Create directory if it doesn't exist data_path.mkdir(parents=True, exist_ok=True) return data_path
def restrict_fwd_to_sources( fwd, n_keep=1284, seed=None, ): """ Randomly reduce a forward solution to a subset of sources on both hemispheres. Parameters ---------- fwd : mne.Forward Forward solution to restrict. n_keep : int Number of sources to keep per hemisphere. seed : int | None Random seed for selecting vertices. """ if not isinstance(n_keep, int): raise TypeError("n_keep must be an integer.") if n_keep <= 0: raise ValueError("n_keep must be positive.") vertices = [np.array([], dtype=int), np.array([], dtype=int)] rng = np.random.default_rng(seed) data_segments = [] for hemi_idx in (0, 1): hemi_vertices = fwd["src"][hemi_idx]["vertno"] if n_keep > len(hemi_vertices): hemi = "lh" if hemi_idx == 0 else "rh" raise ValueError(f"Only {len(hemi_vertices)} sources in {hemi}") sel = np.sort(rng.choice(len(hemi_vertices), size=n_keep, replace=False)) vertices[hemi_idx] = hemi_vertices[sel] data_segments.append(np.ones((n_keep, 1))) data = np.vstack(data_segments) stc = mne.SourceEstimate(data, vertices=vertices, tmin=0.0, tstep=1.0) return mne.forward.restrict_forward_to_stc(fwd, stc, on_missing="raise")