import os
import re
import csv
import sys
import math
import fnmatch
import platform
import operator
from functools import wraps
import numpy as np
import pandas as pd
import nibabel as nb
from matplotlib import cm
from nilearn import signal
from PIL import ImageColor
from shapely import geometry
import matplotlib.colors as mcolors
opj = os.path.join
[docs]
def get_first_existing(dictionary, keys, fallback=None, msg=None):
"""
Return the first existing key from a list in the given dictionary.
If none are found, return fallback (if provided), else None.
Parameters
----------
dictionary : dict
The dictionary to search in.
keys : list
List of keys to check in priority order.
fallback : any, optional
Value to return if none of the keys are found.
msg : str, optional
Extra message in case fallback option is selected
Returns
-------
value from dictionary or fallback
"""
for key in keys:
if key in dictionary:
return dictionary[key]
if isinstance(msg, str):
verbose(msg, True)
return fallback
[docs]
def make_polar_cmap():
"""make_polar_cmap
Creates a colormap with an extended HSV color spectrum to denote the polar angle in pRF mapping.
Returns
----------
matplotlib.colors.ListedColormap
Custom colormap with a doubled HSV range.
"""
top = cm.get_cmap('hsv', 256)
bottom = cm.get_cmap('hsv', 256)
newcolors = np.vstack((top(np.linspace(0, 1, 256)), bottom(np.linspace(0, 1, 256))))
cmap = mcolors.ListedColormap(newcolors, name='hsvx2')
return cmap
[docs]
def get_vertex_nr(subject, as_list=False, debug=False, fs_dir=None):
"""get_vertex_nr
Retrieve the number of vertices in the cortical surface for a given subject.
Parameters
----------
subject : str
Subject ID.
as_list : bool, optional
Whether to return left and right hemisphere vertex counts separately, by default False.
debug : bool, optional
Whether to print debug information, by default False.
fs_dir : str, optional
Path to FreeSurfer subjects directory, by default reads from environment variable `SUBJECTS_DIR`.
Returns
----------
int or list
Total number of vertices (int) or a list of hemisphere-specific counts if `as_list=True`.
Raises
----------
FileNotFoundError
If the surface file is not found.
"""
if not isinstance(fs_dir, str):
fs_dir = os.environ.get("SUBJECTS_DIR")
n_verts_fs = []
for i in ['lh', 'rh']:
surf = opj(fs_dir, subject, 'surf', f'{i}.white')
verbose(surf, debug)
if not os.path.exists(surf):
raise FileNotFoundError(f"Could not find file '{surf}'")
try:
verts = nb.freesurfer.io.read_geometry(surf)[0].shape[0]
except:
annot = opj(fs_dir, subject, 'label', f'{i}.aparc.a2009s.annot')
verts = nb.freesurfer.io.read_annot(annot)[0].shape[0]
n_verts_fs.append(verts)
if as_list:
return n_verts_fs
else:
return sum(n_verts_fs)
[docs]
def disassemble_fmriprep_wf(wf_path, subj_ID, prefix="sub-"):
"""disassemble_fmriprep_wf
Parses the workflow folder from fMRIPrep into a filename based on subject-specific components.
Parameters
----------
wf_path : str
Path to workflow folder.
subj_ID : str
Subject ID to append to `prefix`.
prefix : str, optional
Prefix for the filename (default is `"sub-"`).
Returns
----------
str
Constructed filename based on workflow parts.
Example
----------
.. code-block:: python
from lazyfmri.utils import disassemble_fmriprep_wf
wf_dir = "func_preproc_ses_2_task_pRF_run_1_acq_3DEPI_wf"
fname = disassemble_fmriprep_wf(wf_dir, "001")
print(fname) # 'sub-001_ses-2_task-pRF_acq-3DEPI_run-1'
"""
wf_name = [ii for ii in wf_path.split(os.sep) if "func_preproc" in ii][0]
wf_elem = wf_name.split("_")
fname = [f"{prefix}{subj_ID}"]
for tag in ['ses', 'task', 'acq', 'run']:
if tag in wf_elem:
idx = wf_elem.index(tag)+1
fname.append(f"{tag}-{wf_elem[idx]}")
fname = "_".join(fname)
return fname
[docs]
def assemble_fmriprep_wf(bold_path, wf_only=False):
"""assemble_fmriprep_wf
Converts a BOLD file path into a workflow name for fMRIPrep.
Parameters
----------
bold_path : str
Path to the BOLD file.
wf_only : bool, optional
Whether to return only the workflow name without `single_subject_<sub_id>_wf`, by default False.
Returns
----------
str
Constructed workflow name.
Example
----------
.. code-block:: python
from lazyfmri.utils import assemble_fmriprep_wf
bold_file = "sub-008_ses-2_task-SRFi_acq-3DEPI_run-1_desc-preproc_bold.nii.gz"
wf_name = assemble_fmriprep_wf(bold_file)
print(wf_name) # 'single_subject_008_wf/func_preproc_ses_2_task_SRFi_run_1_acq_3DEPI_wf'
.. code-block:: python
wf_name = assemble_fmriprep_wf(bold_file, wf_only=True)
print(wf_name) # 'func_preproc_ses_2_task_SRFi_run_1_acq_3DEPI_wf'
"""
bids_comps = split_bids_components(os.path.basename(bold_path))
fname = ["func_preproc"]
for tag in ['ses', 'task', 'run', 'acq']:
if tag in list(bids_comps.keys()):
fname.append(f"{tag}_{bids_comps[tag]}")
base_dir = ""
fname = "_".join(fname)+"_wf"
if 'sub' in list(bids_comps.keys()):
base_dir = f"single_subject_{bids_comps['sub']}_wf"
if wf_only:
return fname
else:
return opj(base_dir, fname)
else:
return fname
[docs]
def select_files_from_bids(
inputdir,
filters=['acq-MP2RAGE_', 'inv'],
get_kws={},
**kwargs):
"""
Selects files from a BIDS-formatted directory using pybids and optional substring filters.
This function wraps around `BIDSLayout.get()` to retrieve NIfTI files from a BIDS dataset,
with optional keyword arguments and substring filtering.
Parameters
----------
inputdir : str
Path to the BIDS dataset directory or a subfolder (e.g., `.../sub-01/anat`).
filters : str or list of str, optional
Substrings used to filter the resulting list of files. Only files containing all
specified substrings will be returned. If set to `None`, all matching files are returned.
Default is ['acq-MP2RAGE_', 'inv'].
get_kws : dict, optional
Additional keyword arguments to pass to `BIDSLayout.get()`. These are merged with
default values for 'extension' and 'return_type'.
**kwargs : dict
Additional keyword arguments passed to the `BIDSLayout` constructor (e.g., `config`, `database_path`).
Returns
-------
list of str
List of file paths that match the specified filters and BIDS selection criteria.
Notes
-----
- The function defaults to selecting `.nii.gz` files and returns their full paths.
- If `inputdir` ends with `"anat"`, the BIDS data_type is automatically set to `"anat"`.
- If `filters` is provided, the results are further reduced to only those containing the given substring(s).
"""
from bids import BIDSLayout
# update kwargs
def_dict = {
"extension": ["nii.gz"],
"return_type": "file"
}
for key, val in def_dict.items():
get_kws = update_kwargs(
get_kws,
key,
val
)
if inputdir.endswith("anat"):
get_kws = update_kwargs(
get_kws,
"data_type",
"anat"
)
bids_fn = BIDSLayout(
inputdir,
validate=False,
**kwargs).get(**get_kws)
if isinstance(filters, (str, list)):
filt_files = get_file_from_substring(filters, bids_fn)
filt_files.sort()
return filt_files
else:
return bids_fn
[docs]
class BIDSFile:
"""BIDSFile
Class to extract BIDS-related components from filenames and paths.
Parameters
----------
bids_file : str
Path to the BIDS-compliant file.
Methods
----------
`get_bids_basepath()`
Retrieve the BIDS base path.
`get_bids_root()`
Retrieve the BIDS root directory.
`get_bids_workbase()`
Retrieve the BIDS working directory.
`get_bids_workflow()`
Retrieve the workflow name for fMRIPrep.
`get_bids_ids()`
Extract subject, session, task, and run information from filename.
"""
def __init__(self, bids_file):
self.bids_file = os.path.abspath(bids_file)
[docs]
def get_bids_basepath(self, *args):
return self._get_bids_basepath(self.bids_file, *args)
[docs]
def get_bids_root(self, *args):
return self._get_bids_root(self.bids_file, *args)
[docs]
def get_bids_workbase(self, *args):
return self._get_bids_workbase(self.bids_file, *args)
[docs]
def get_bids_workflow(self, **kwargs):
return assemble_fmriprep_wf(self.bids_file, **kwargs)
# def get_bids_root(self):
@staticmethod
def _get_bids_basepath(file, pref="sub"):
sp = file.split(os.sep)
for i in sp:
if i.startswith(pref) and not i.endswith('.nii.gz'):
base_path = os.sep.join(sp[sp.index(i)+1:-1])
break
return base_path
# def get_bids_root(self):
@staticmethod
def _get_bids_workbase(file, pref="sub"):
sp = file.split(os.sep)
for i in sp:
if i.startswith(pref) and not i.endswith('.nii.gz'):
base_path = os.sep.join(sp[sp.index(i):-2])
break
return base_path
@staticmethod
def _get_bids_root(file, pref="sub"):
sp = file.split(os.sep)
for i in sp:
if i.startswith(pref) and not i.endswith('.nii.gz'):
bids_root = os.sep.join(sp[:sp.index(i)])
break
return bids_root
[docs]
def get_bids_ids(self, **kwargs):
return split_bids_components(self.bids_file, **kwargs)
[docs]
class color:
# """color
# Add some color to the terminal.
# Example
# ----------
# print("set orientation to " + utils.color.BOLD + utils.color.RED + "SOME TEXT THAT'LL BE IN TED" + utils.color.END)
# """
PURPLE = '\033[95m'
CYAN = '\033[96m'
DARKCYAN = '\033[36m'
BLUE = '\033[34m'
GREEN = '\033[32m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
MAGENTA = "\033[35m"
END = '\033[0m'
[docs]
def convert2unit(v, method="np"):
"""convert2unit
Normalize a vector to a unit vector.
Parameters
----------
v : numpy.ndarray
Vector to be normalized.
method : str, optional
Method for normalization (`"np"` for NumPy, `"mesh"` for mesh-based normalization), by default `"np"`.
Returns
----------
numpy.ndarray
Unit-normalized vector.
"""
if method.lower() == "np":
v_hat = v / np.linalg.norm(v)
return v_hat
elif method.lower() == "mesh":
# https://sites.google.com/site/dlampetest/python/calculating-normals-of-a-triangle-mesh-using-numpy
lens = np.sqrt( v[:,0]**2 + v[:,1]**2 + v[:,2]**2 )
v[:,0] /= lens
v[:,1] /= lens
v[:,2] /= lens
return v
[docs]
def string2list(string_array, make_float=False):
"""string2list
Convert a string representation of a list into an actual Python list.
Parameters
----------
string_array : str
String containing a list representation.
make_float : bool, optional
Whether to convert list elements to float, by default False.
Returns
----------
list
Parsed list.
Example
----------
.. code-block:: python
from lazyfmri.utils import string2list
lst = string2list('[tc,bgfs]')
print(lst) # ['tc', 'bgfs']
"""
if type(string_array) == str:
new = string_array.split(',')[0:]
new = list(filter(None, new))
if make_float:
new = [float(ii) for ii in new]
return new
else:
# array is already in non-string format
return string_array
[docs]
def string2float(string_array):
"""string2float
This function converts a array in string representation to a regular float array. This can happen, for instance, when
you've stored a numpy array in a pandas dataframe (such is the case with the 'normal' vector). It starts by splitting
based on empty spaces, filter these, and convert any remaining elements to floats and returns these in an array.
Parameters
----------
string_array: str
string to be converted to a valid numpy array with float values
Returns
----------
numpy.ndarray
array containing elements in float rather than in string representation
Example
----------
string2float('[ -7.42 -92.97 -15.28]')
array([ -7.42, -92.97, -15.28])
"""
if type(string_array) == str:
new = string_array[1:-1].split(' ')[0:]
new = list(filter(None, new))
new = [float(i.strip(",")) for i in new]
new = np.array(new)
return new
else:
# array is already in non-string format
return string_array
[docs]
def print_cmd_highlight(cmd):
"""
Print a shell command with the executable in bold blue, using the `color` class.
Parameters
----------
cmd : str
The full command string to print.
"""
parts = cmd.strip().split(" ", 1)
executable = parts[0]
rest = parts[1] if len(parts) > 1 else ""
print(f"{color.BOLD}{color.BLUE}{executable}{color.END} {rest}", flush=True)
[docs]
def reverse_sign(x):
"""reverse_sign
Inverts the sign of given values.
Parameters
----------
x : int, float, list, numpy.ndarray
Input value(s) to be inverted.
Returns
----------
Same type as input
Values with inverted sign.
Example
----------
.. code-block:: python
from lazyfmri.utils import reverse_sign
print(reverse_sign(5)) # -5
.. code-block:: python
print(reverse_sign(np.array([2, -2340, 2345, 123])))
# array([-2, 2340, -2345, -123])
"""
import numpy as np
inverted = ()
if isinstance(x, int) or isinstance(x, float) or isinstance(x, np.float32):
if x > 0:
inverted = -x
else:
inverted = abs(x)
elif isinstance(x, np.ndarray):
for i in x:
if float(i) > 0:
val = -float(i)
else:
val = abs(float(i))
inverted = np.append(inverted, val)
return inverted
[docs]
def convert_to_rgb(color, as_integer=False):
if isinstance(color, tuple):
(R, G, B) = color
elif isinstance(color, str):
if len(color) == 1:
color = mcolors.to_rgb(color)
else:
color = ImageColor.getcolor(color, "RGB")
(R, G, B) = color
if not as_integer:
rgb = []
for v in [R, G, B]:
if v > 1:
v /= 255
rgb.append(v)
R, G, B = rgb
else:
rgb = []
for v in [R, G, B]:
if v <= 1:
v = int(v*255)
rgb.append(v)
R, G, B = rgb
return (R, G, B)
[docs]
def decode(obj):
"""decode an object"""
if isinstance(obj, bytes):
obj = obj.decode()
return obj
[docs]
def copy_hdr(source_img, dest_img):
"""copy_hdr
Copy the header of one NIfTI image to another.
Parameters
----------
source_img : str or nibabel.Nifti1Image
Source image.
dest_img : str or nibabel.Nifti1Image
Target image.
Returns
----------
nibabel.Nifti1Image
Image with the header copied from the source.
Example
----------
.. code-block:: python
from lazyfmri.utils import copy_hdr
new_img = copy_hdr(img1, img2)
"""
if isinstance(source_img, nb.Nifti1Image):
src_img = source_img
elif isinstance(source_img, str):
src_img = nb.load(source_img)
if isinstance(dest_img, nb.Nifti1Image):
targ_img = dest_img
elif isinstance(dest_img, str):
targ_img = nb.load(dest_img)
new = nb.Nifti1Image(targ_img.get_fdata(), affine=src_img.affine, header=src_img.header)
return new
[docs]
def ants_truncate_intensities(
in_file,
out_file,
lower=0.01,
upper=0.99,
n_bins=256):
import nibabel as nb
import os
if not isinstance(in_file, str):
raise TypeError(f"Input must be a string pointing to a nifti file or a nb.Nifti1Image-object, not '{type(in_file)}'")
else:
dims = nb.load(in_file).header["dim"][0]
if not isinstance(out_file, str):
out_file = os.path.abspath(in_file)
raise TypeError(f"out_file must be a string, not '{type(out_file)}'")
else:
out_file = os.path.abspath(out_file)
cmd = f"ImageMath {dims} {out_file} TruncateImageIntensity {in_file} {lower} {upper} {n_bins}"
# print command if verb = True
print(cmd)
os.system(cmd)
return out_file
[docs]
def ants_to_spm_moco(affine, deg=False, convention="SPM"):
"""SPM output = x [LR], y [AP], z [SI], rx, ry, rz. ANTs employs an LPS system, so y value should be switched"""
dx, dy, dz = affine[9:]
if convention == "SPM":
dy = reverse_sign(dy)
rot_x = np.arcsin(affine[6])
cos_rot_x = np.cos(rot_x)
rot_y = np.arctan2(affine[7] / cos_rot_x, affine[8] / cos_rot_x)
rot_z = np.arctan2(affine[3] / cos_rot_x, affine[0] / cos_rot_x)
if deg:
rx,ry,rz = np.degrees(rot_x),np.degrees(rot_y),np.degrees(rot_z)
else:
rx,ry,rz = rot_x,rot_y,rot_z
moco_pars = np.array([dx,dy,dz,rx,ry,rz])
return moco_pars
[docs]
def make_chicken_csv(coord, input="ras", output_file=None, vol=0.343):
"""make_chicken_csv
Create a CSV file for ANTs transformation of coordinates.
Parameters
----------
coord : numpy.ndarray
Array containing (x, y, z) coordinates.
input : str, optional
Input coordinate system (`"ras"` or `"lps"`), by default `"ras"`.
output_file : str, optional
Output file path.
vol : float, optional
Voxel volume, by default 0.343.
Returns
----------
str
Path to the created CSV file.
Example
----------
.. code-block:: python
from lazyfmri.utils import make_chicken_csv
csv_file = make_chicken_csv(
np.array([-16.239, -67.23, -2.81]),
output_file="sub-001_space-fs.csv"
)
print(csv_file) # "sub-001_space-fs.csv"
"""
if len(coord) > 3:
coord = coord[:3]
if input.lower() == "ras":
# ras2lps
LPS = np.array([[-1,0,0],
[0,-1,0],
[0,0,1]])
coord = LPS @ coord
# rows = ["x,y,z,t,label,mass,volume,count", f"{coord[0]},{coord[1]},{coord[2]},0,1,1,{vol},1"]
with open(output_file, "w") as target:
writer = csv.writer(target, delimiter=",")
writer.writerow(["x","y","z","t","label","mass","volume","count"])
writer.writerow([coord[0],coord[1],coord[2],0,1,1,vol,1])
return output_file
[docs]
def read_chicken_csv(chicken_file, return_type="lps"):
"""read_chicken_csv
Function to get at least the coordinates from a csv file used with antsApplyTransformsToPoints.
(see https://github.com/stnava/chicken for the reason of this function's name)
Parameters
----------
chicken_file: str
path-like string pointing to an input file (.csv!)
return_type: str
specify the coordinate system that the output should be in
Returns
----------
numpy.ndarray
(3,) array containing the coordinate in `chicken_file`
Example
----------
read_chicken_csv("sub-001_space-fs_desc-lpi.csv")
"""
contents = pd.read_csv(chicken_file)
coord = np.squeeze(contents.iloc[:,0:3].values)
if return_type.lower() == "lps":
return coord
elif return_type.lower() == "ras":
# ras2lps
LPS = np.array([[-1,0,0],
[0,-1,0],
[0,0,1]])
return LPS@coord
[docs]
def make_binary_cm(color):
"""make_binary_cm
This function creates a custom binary colormap using matplotlib based on the RGB code specified. Especially useful if you
want to overlay in imshow, for instance. These RGB values will be converted to range between 0-1 so make sure you're
specifying the actual RGB-values. I like `https://htmlcolorcodes.com` to look up RGB-values of my desired color. The
snippet of code used here comes from https://kbkb-wx-python.blogspot.com/2015/12/python-transparent-colormap.html
Parameters
----------
<color>: tuple, str
either hex-code with (!!) '#' or a tuple consisting of:
* <R> int | red-channel (0-255)
* <G> int | green-channel (0-255)
* <B> int | blue-channel (0-255)
Returns
----------
matplotlib.colors.LinearSegmentedColormap object
colormap to be used with `plt.imshow`
Example
----------
.. code-block:: python
cm = make_binary_cm((232,255,0))
print(cm) # <matplotlib.colors.LinearSegmentedColormap at 0x7f35f7154a30>
.. code-block:: python
cm = make_binary_cm("#D01B47")
print(cm) # <matplotlib.colors.LinearSegmentedColormap at 0x7f35f7154a30>
"""
# convert input to RGB
R, G, B = convert_to_rgb(color)
colors = [(R, G, B, c) for c in np.linspace(0, 1, 100)]
cmap = mcolors.LinearSegmentedColormap.from_list('mycmap', colors, N=5)
return cmap
[docs]
class FindFiles:
"""FindFiles
Class to search for files in a directory based on a specified extension and optional filters.
This class allows recursive search with depth constraints and filtering based on filename patterns.
Parameters
----------
directory : str
Path to the directory where the search should be performed.
extension : str
File extension to search for (e.g., ".nii", ".csv").
exclude : str or list, optional
String or list of substrings to exclude from the search results.
maxdepth : int, optional
Maximum depth of recursion for searching subdirectories. If None, no limit is imposed.
filters : str or list, optional
String or list of substrings to filter the results. Only files matching these filters will be retained.
Attributes
----------
files : list
Sorted list of found files that match the search criteria.
Methods
----------
`find_files()`
Static method that yields files in a directory matching the given pattern.
"""
def __init__(self, directory, extension, exclude=None, maxdepth=None, filters=None):
"""Initialize FindFiles instance and perform file search."""
self.directory = directory
self.extension = extension
self.exclude = exclude
self.maxdepth = maxdepth
self.filters = filters
self.files = []
for filename in self.find_files(self.directory, f"*{self.extension}", maxdepth=self.maxdepth):
if not filename.startswith("._"):
self.files.append(filename)
self.files.sort()
if isinstance(self.exclude, (str, list)) or isinstance(self.filters, (list, str)):
if isinstance(self.filters, str):
self.filters = [self.filters]
elif isinstance(self.filters, list):
pass
else:
self.filters = []
self.files = get_file_from_substring(self.filters, self.files, exclude=self.exclude)
[docs]
@staticmethod
def find_files(directory, pattern, maxdepth=None):
directory = os.path.abspath(directory)
base_depth = directory.rstrip(os.sep).count(os.sep)
for root, dirs, files in os.walk(directory, followlinks=True):
depth = root.rstrip(os.sep).count(os.sep) - base_depth
if maxdepth is not None and depth > maxdepth:
dirs[:] = []
continue
if maxdepth is not None and depth >= maxdepth:
dirs[:] = []
for basename in files:
if fnmatch.fnmatch(basename, pattern):
yield os.path.join(root, basename)
[docs]
def find_missing(lst):
return [i for x, y in zip(lst, lst[1:])
for i in range(x + 1, y) if y - x > 1]
[docs]
def make_diverging_list(
values,
neg_color="#4b3b8f",
pos_color="#c24e00",
as_hex=True,
use_magnitude=True,
**kwargs
):
"""Create a diverging color list mapped to values.
Parameters
----------
values : array-like
Input values (e.g., effects)
neg_color : str
Color for negative values
pos_color : str
Color for positive values
as_hex : bool
Return hex colors (default True)
use_magnitude : bool
If True, assign gradient based on magnitude
Returns
-------
list
List of colors aligned with `values`
"""
import seaborn as sns
values = np.asarray(values)
neg_mask = values < 0
pos_mask = values >= 0
neg_vals = values[neg_mask]
pos_vals = values[pos_mask]
# create palettes
neg_palette = sns.light_palette(
neg_color,
n_colors=len(neg_vals),
reverse=True
)
pos_palette = sns.light_palette(
pos_color,
n_colors=len(pos_vals)
)
colors = np.empty(len(values), dtype=object)
if use_magnitude:
# sort within each side by magnitude
neg_order = np.argsort(neg_vals) # most negative → least
pos_order = np.argsort(pos_vals) # smallest → largest
for rank, idx in enumerate(neg_order):
orig_idx = np.where(neg_mask)[0][idx]
colors[orig_idx] = neg_palette[rank]
for rank, idx in enumerate(pos_order):
orig_idx = np.where(pos_mask)[0][idx]
colors[orig_idx] = pos_palette[rank]
else:
# assign in original order
neg_i = pos_i = 0
for i, v in enumerate(values):
if v < 0:
colors[i] = neg_palette[neg_i]
neg_i += 1
else:
colors[i] = pos_palette[pos_i]
pos_i += 1
if as_hex:
return [mcolors.rgb2hex(c) for c in colors]
else:
return list(colors)
[docs]
def make_diverging_cm(
neg_color,
pos_color,
neutral="#f2f2f2",
as_list=False,
**kwargs):
input_list = [neg_color, neutral, pos_color]
# scale to 0-1
col_list = []
for color in input_list:
scaled_color = convert_to_rgb(color)
col_list.append(scaled_color)
cm = mcolors.LinearSegmentedColormap.from_list("", col_list, **kwargs)
if as_list:
[mcolors.rgb2hex(cm(i / (cm.N - 1))) for i in range(cm.N)]
else:
return cm
[docs]
def make_between_cm(
col1,
col2,
as_list=False,
**kwargs):
input_list = [col1, col2]
# scale to 0-1
col_list = []
for color in input_list:
scaled_color = convert_to_rgb(color)
col_list.append(scaled_color)
cm = mcolors.LinearSegmentedColormap.from_list("", col_list, **kwargs)
if as_list:
return [mcolors.rgb2hex(cm(i)) for i in range(cm.N)]
else:
return cm
[docs]
def make_stats_cm(
direction,
lower_neg=(51, 0, 248),
upper_neg=(151, 253, 253),
lower_pos=(217, 36, 36),
upper_pos=(251, 255, 72),
invert=False,
):
if direction not in ["pos", "neg"]:
raise ValueError(
f"direction must be one of 'pos' or 'neg', not '{direction}'")
if direction == "pos":
input_list = [lower_pos, upper_pos]
else:
input_list = [lower_neg, upper_neg]
if invert:
input_list = input_list[::-1]
# scale to 0-1
col_list = []
for color in input_list:
scaled_color = convert_to_rgb(color)
col_list.append(scaled_color)
return mcolors.LinearSegmentedColormap.from_list("", col_list)
[docs]
def remove_files(path, string, ext=False):
"""remove_files
Remove files from a given path that containg a string as extension (`ext=True`), or at the
start of the file (`ext=False`)
Parameters
----------
path: str
path to the directory from which we need to remove files
string: str
tag for files we need to remove
ext: str, optional
only remove files containing `string` that end with `ext`
"""
files_in_directory = os.listdir(path)
if ext:
filtered_files = [
file for file in files_in_directory if file.endswith(string)]
else:
filtered_files = [
file for file in files_in_directory if file.startswith(string)]
for file in filtered_files:
path_to_file = os.path.join(path, file)
os.remove(path_to_file)
[docs]
def match_lists_on(ref_list, search_list, matcher="run"):
"""match_lists_on
Match two list based on a BIDS-specifier such as 'sub', 'run', etc. Can be any key that is extracted using :func:`linescanning.utils.split_bids_components`.
Parameters
----------
ref_list: list
List to use as reference
search_list: list
List to search for items in `ref_list`
matcher: str, optional
BIDS-identifier, by default "run"
Returns
----------
list
new `search_list` filtered for items in `ref_list`
Example
----------
>>> # Let's say I have functional files for 3 runs
>>> func_file
>>> ['sub-003_ses-3_task-SR_run-3_bold.mat',
>>> 'sub-003_ses-3_task-SR_run-4_bold.mat',
>>> 'sub-003_ses-3_task-SR_run-6_bold.mat']
>>> # and anatomical slices for 5 runs
>>> anat_slices
>>> ['sub-003_ses-3_acq-1slice_run-2_T1w.nii.gz',
>>> 'sub-003_ses-3_acq-1slice_run-3_T1w.nii.gz',
>>> 'sub-003_ses-3_acq-1slice_run-4_T1w.nii.gz',
>>> 'sub-003_ses-3_acq-1slice_run-5_T1w.nii.gz',
>>> 'sub-003_ses-3_acq-1slice_run-6_T1w.nii.gz']
>>> # I can then use `match_list_on` to find the anatomical slices corresponding to the functional files
>>> from linescanning import utils
>>> utils.match_lists_on(func_file, anat_slices, matcher='run')
>>> ['sub-003_ses-3_acq-1slice_run-3_T1w.nii.gz',
>>> 'sub-003_ses-3_acq-1slice_run-4_T1w.nii.gz',
>>> 'sub-003_ses-3_acq-1slice_run-6_T1w.nii.gz']
"""
if isinstance(matcher, str):
matcher = [matcher]
new_list = []
for ii in ref_list:
comps = split_bids_components(ii)
# loop through elements in 'matcher' list
search_for = [f"{ii}-{comps[ii]}" for ii in matcher]
ff = get_file_from_substring(search_for, search_list, return_msg="None")
if ff != None:
if ff == search_list:
raise ValueError(f"Output list is equal to input list with identifier '{matcher}'. Please use unique identifier")
new_list.append(ff)
return new_list
[docs]
def filter_for_nans(array):
"""filter out NaNs from an array"""
if np.isnan(array).any():
return np.nan_to_num(array)
else:
return array
[docs]
def calculate_tsnr(data, ax):
mean_d = np.mean(data, axis=ax)
std_d = np.std(data, axis=ax)
tsnr = mean_d/std_d
tsnr[np.where(np.isinf(tsnr))] = np.nan
return tsnr
[docs]
def percent_change(
ts,
ax,
nilearn=False,
baseline=20,
prf=False,
dm=None
):
"""percent_change
Function to convert input data to percent signal change. Two options are current supported: the nilearn method
(`nilearn=True`), where the mean of the entire timecourse if subtracted from the timecourse, and the baseline method
(`nilearn=False`), where the median of `baseline` is subtracted from the timecourse.
Parameters
----------
ts: numpy.ndarray
Array representing the data to be converted to percent signal change. Should be of shape (n_voxels, n_timepoints)
ax: int
Axis over which to perform the conversion. If shape (n_voxels, n_timepoints), then ax=1.
If shape (n_timepoints, n_voxels), then ax=0.
nilearn: bool, optional
Use nilearn method, by default False
baseline: int, list, np.ndarray optional
Use custom method where only the median of the baseline (instead of the full timecourse) is subtracted, by default 20.
Length should be in `volumes`, not `seconds`. Can also be a list or numpy array (1d) of indices which are to be
considered as baseline. The list of indices should be corrected for any deleted volumes at the beginning.
Returns
----------
numpy.ndarray
Array with the same size as `ts` (voxels,time), but with percent signal change.
Raises
----------
ValueError
If `ax` > 2
"""
if ts.ndim == 1:
ts = ts[:, np.newaxis]
ax = 0
if prf:
from fmriproc import prf
# format data
if ts.ndim == 1:
ts = ts[..., np.newaxis]
# read design matrix
if isinstance(dm, str):
dm = prf.read_par_file(dm)
# calculate mean
avg = np.mean(ts, axis=ax)
ts *= (100/avg)
# find points with no stimulus
timepoints_no_stim = prf.baseline_from_dm(dm)
# find timecourses with no stimulus
if ax == 0:
med_bsl = ts[timepoints_no_stim, :]
else:
med_bsl = ts[:, timepoints_no_stim]
# calculat median over baseline
median_baseline = np.median(med_bsl, axis=ax)
# shift to zero
ts -= median_baseline
return ts
else:
if nilearn:
if ax == 0:
psc = signal._standardize(ts, standardize='psc')
else:
psc = signal._standardize(ts.T, standardize='psc').T
else:
# first step of PSC; set NaNs to zero if dividing by 0 (in case of crappy timecourses)
ts_m = ts * \
np.expand_dims(np.nan_to_num((100/np.mean(ts, axis=ax))), ax)
# get median of baseline
if isinstance(baseline, np.ndarray):
baseline = list(baseline)
if ax == 0:
if isinstance(baseline, list):
median_baseline = np.median(ts_m[baseline, :], axis=0)
else:
median_baseline = np.median(ts_m[:baseline, :], axis=0)
elif ax == 1:
if isinstance(baseline, list):
median_baseline = np.median(ts_m[:, baseline], axis=1)
else:
median_baseline = np.median(ts_m[:, :baseline], axis=1)
else:
raise ValueError("ax must be 0 or 1")
# subtract
psc = ts_m-np.expand_dims(median_baseline, ax)
return psc
[docs]
def split_bids_components(fname, entities=False, add_elements=None):
comp_list = fname.split('_')
comps = {}
ids = ['sub', 'ses', 'task', 'acq', 'rec', 'run',
'space', 'hemi', 'model', 'stage', 'desc', 'vox']
if add_elements is not None:
if isinstance(add_elements, str):
add_elements = [add_elements]
ids += add_elements
full_entities = [
"subject",
"session",
"task",
"reconstruction",
"acquisition",
"run"
]
for el in comp_list:
for i in ids:
if i in el:
comp = el.split('-')[-1]
if "." in comp:
ic = comp.index(".")
if ic > 0:
ex = 0
else:
ex = -1
comp = comp.split(".")[ex]
# if i == "run":
# comp = int(comp)
comps[i] = comp
if len(comps) != 0:
if entities:
return comps, full_entities
else:
return comps
else:
raise ValueError(f"Could not find any element of {ids} in {fname}")
[docs]
def get_ids(func_list, bids="task"):
ids = []
if isinstance(func_list, list):
for ff in func_list:
if isinstance(ff, str):
bids_comps = split_bids_components(ff)
if bids in list(bids_comps.keys()):
ids.append(bids_comps[bids])
if len(ids) > 0:
ids = list(np.unique(np.array(ids)))
return ids
else:
return []
[docs]
def str2operator(ops):
if ops in ["and", "&", "&&"]:
return operator.and_
elif ops in ["or", "|", "||"]:
return operator.or_
elif ops in ["is not", "!="]:
return operator.ne
elif ops in ["is", "==", "="]:
return operator.eq
elif ops in ["gt", ">"]:
return operator.gt
elif ops in ["lt", "<"]:
return operator.lt
elif ops in ["ge", ">="]:
return operator.ge
elif ops in ["le", "<="]:
return operator.le
elif ops in ["x", "*"]:
return operator.mul
elif ops == "/":
return operator.truediv
else:
raise NotImplementedError()
[docs]
def select_from_df(
df,
expression=None,
index=True,
indices=None,
match_exact=True
):
"""select_from_df
Selects a subset of a pandas DataFrame based on an expression, index-based selection, or column filtering.
This function provides flexible data selection methods:
- Using an `expression`, which consists of a column name and an operator (e.g., `'task = "rest"'`).
- Using `indices`, which selects columns based on a list, range, or numpy array.
- Optionally reindexing the output DataFrame.
Parameters
----------
df : pandas.DataFrame
The input dataframe.
expression : str, list, tuple, optional
The condition(s) to filter the DataFrame rows. Expressions should be in the format:
`"column operator value"` where `operator` can be one of:
`'='`, `'>'`, `'<'`, `'>='`, `'<='`, `'!='`.
Multiple expressions can be combined with `&` (logical AND).
Example: `"task = 'rest'"`, `"age > 30"`, `["task = 'rest'", "&", "age > 30"]`.
index : bool, optional
Whether to return the output DataFrame with the same indexing as `df`. Default is `True`.
indices : list, range, numpy.ndarray, optional
Specifies column indices to select from `df`. This can be:
- A `list` of column names or indices.
- A `range` object.
- A `numpy.ndarray` containing column indices.
If `indices` is specified, `expression` is ignored.
match_exact : bool, optional
If `indices` is a list of strings, determines how column names are matched:
- `True` (default): Exact matches only.
- `False`: Partial matches (i.e., column names that contain the given strings).
Returns
----------
pandas.DataFrame
A new DataFrame with rows/columns selected based on `expression` or `indices`.
Raises
----------
TypeError
If `indices` is not a tuple, list, or array.
ValueError
If an expression does not match any row in `df`.
Examples
----------
**Select rows based on an expression:**
.. code-block:: python
import pandas as pd
from lazyfmri.utils import select_from_df
# Create example DataFrame
data = {
"subject": [1, 2, 3, 4],
"age": [25, 30, 35, 40],
"task": ["rest", "active", "rest", "active"]
}
df = pd.DataFrame(data)
# Select rows where task is "rest"
df_filtered = select_from_df(df, expression="task = 'rest'")
print(df_filtered)
# Output:
# subject age task
# 0 1 25 rest
# 2 3 35 rest
**Select columns by index range:**
.. code-block:: python
df_filtered = select_from_df(df, indices=range(0, 2))
print(df_filtered)
# Output:
# subject age
# 0 1 25
# 1 2 30
# 2 3 35
# 3 4 40
**Select multiple conditions using AND (`&`):**
.. code-block:: python
df_filtered = select_from_df(df, expression=["task = 'rest'", "&", "age > 30"])
print(df_filtered)
# Output:
# subject age task
# 2 3 35 rest
"""
# not the biggest fan of a function within a function, but this allows
# easier translation of expressions/operators
def sort_expressions(expression):
expr_ = expression.split(" ")
if len(expr_) > 3:
for ix, i in enumerate(expr_):
try:
_ = str2operator(i)
break
except BaseException:
pass
col1 = " ".join(expr_[:ix])
val1 = " ".join(expr_[(ix + 1):])
operator1 = expr_[ix]
else:
col1, operator1, val1 = expr_
return col1, operator1, val1
if isinstance(indices, (list, tuple, np.ndarray)):
if isinstance(indices, tuple):
return df.iloc[:, indices[0]:indices[1]]
elif isinstance(indices, list):
if all(isinstance(item, str) for item in indices):
if match_exact:
return df[df.columns[df.columns.isin(indices)]]
else:
df_tmp = []
for item in indices:
df_tmp.append(
df[df.columns[df.columns.str.contains(item)]])
return pd.concat(df_tmp, axis=1)
else:
return df.iloc[:, indices]
elif isinstance(indices, np.ndarray):
return df.iloc[:, list(indices)]
else:
raise TypeError(
f"""Unknown type '{type(indices)}' for indices; must be a tuple of 2 values representing a range, or a
list/array of indices to select""")
else:
if not isinstance(expression, (str, tuple, list)):
raise ValueError(
f"""Please specify expressions to apply to the dataframe.
Input is '{expression}' of type ({type(expression)})"""
)
# fetch existing indices
idc = list(df.index.names)
if idc[0] is not None:
reindex = True
else:
reindex = False
# sometimes throws an error if you're trying to reindex a non-indexed
# dataframe
try:
df = df.reset_index()
except BaseException:
pass
sub_df = df.copy()
if isinstance(expression, str):
expression = [expression]
if isinstance(expression, (tuple, list)):
expressions = expression[::2]
operators = expression[1::2]
if len(expressions) == 1:
# find operator index
col1, operator1, val1 = sort_expressions(expressions[0])
# convert to operator function
ops1 = str2operator(operator1)
# use dtype of whatever dtype the colum is
search_value = np.array(
[val1], dtype=type(
sub_df[col1].values[0]))
sub_df = sub_df.loc[ops1(sub_df[col1], search_value[0])]
if len(expressions) == 2:
col1, operator1, val1 = sort_expressions(expressions[0])
col2, operator2, val2 = sort_expressions(expressions[1])
main_ops = str2operator(operators[0])
ops1 = str2operator(operator1)
ops2 = str2operator(operator2)
# check if we should interpret values invididually as integers
search_value1 = np.array(
[val1], dtype=type(
sub_df[col1].values[0]))[0]
search_value2 = np.array(
[val2], dtype=type(
sub_df[col2].values[0]))[0]
sub_df = sub_df.loc[main_ops(
ops1(sub_df[col1], search_value1), ops2(sub_df[col2], search_value2))]
# first check if we should do indexing
if index is not None:
# then check if we actually have something to index
if reindex:
if idc[0] is not None:
sub_df = sub_df.set_index(idc)
if sub_df.shape[0] == 0:
raise ValueError(
f"The following expression(s) resulted in an empty dataframe: {expression}")
return sub_df
[docs]
def multiselect_from_df(df, expression=[]):
if not isinstance(expression, list):
raise TypeError(
f"expression must be list of tuples (see docs utils.select_from_df), not {type(expression)}")
if len(expression) == 0:
raise ValueError("List is empty")
for expr in expression:
df = select_from_df(df, expression=expr)
return df
[docs]
def round_decimals_down(number: float, decimals: int = 2):
"""
Returns a value rounded down to a specific number of decimal places.
see: https://kodify.net/python/math/round-decimals/#round-decimal-places-up-and-down-round
"""
if not isinstance(decimals, int):
raise TypeError("decimal places must be an integer")
elif decimals < 0:
raise ValueError("decimal places has to be 0 or more")
elif decimals == 0:
return math.floor(number)
factor = 10 ** decimals
return math.floor(number * factor) / factor
[docs]
def round_decimals_up(number: float, decimals: int = 2):
"""
Returns a value rounded up to a specific number of decimal places.
see: https://kodify.net/python/math/round-decimals/#round-decimal-places-up-and-down-round
"""
if not isinstance(decimals, int):
raise TypeError("decimal places must be an integer")
elif decimals < 0:
raise ValueError("decimal places has to be 0 or more")
elif decimals == 0:
return math.ceil(number)
factor = 10 ** decimals
return math.ceil(number * factor) / factor
[docs]
def verbose(msg, verbose, flush=True, highlight=False, **kwargs):
if verbose:
if not highlight:
print(msg, flush=flush, **kwargs)
else:
print_cmd_highlight(msg)
[docs]
def get_unique_ids(df, id=None, sort=True, as_int=False, drop_na=True):
try:
df = df.reset_index()
except BaseException:
pass
if not isinstance(id, str):
raise ValueError("Please specify a identifier from the dataframe")
try:
a = df[id].values
if not sort:
indexes = np.unique(a, return_index=True)[1]
ret_list = [a[index] for index in sorted(indexes)]
else:
ret_list = list(np.unique(a))
# https://stackoverflow.com/a/50297200
if drop_na:
ret_list = [x for x in ret_list if x == x]
if as_int:
ret_list = [int(i) for i in ret_list]
return ret_list
except Exception:
raise RuntimeError(f"Could not find '{id}' in {list(df.columns)}")
[docs]
def pairwise(l1):
# Create an empty list 'temp' to store the pairs
temp = []
# Iterate through the list elements up to the second-to-last element
for i in range(len(l1) - 1):
# Get the current element and the next element in the list
current_element, next_element = l1[i], l1[i + 1]
# Create a tuple 'x' containing the current and next elements
x = (current_element, next_element)
# Append the tuple 'x' to the 'temp' list
temp.append(x)
# Return the list of pairs
return temp
[docs]
def get_file_from_substring(filt, path, return_msg='error', exclude=None):
"""get_file_from_substring
Finds and returns file(s) in a directory that contain specific substrings in their filenames.
Supports multiple search filters and optional exclusions.
This function scans a directory for filenames containing the specified substring(s) (`filt`).
It constructs a binary matrix where each row represents a file, and each column represents a filter.
A file is considered a match if all filters exist within its filename.
- If a **single file** matches, it returns the file path as a string.
- If **multiple files** match, it returns a list of file paths.
- If **no files** match, it raises an error or returns `None`, depending on `return_msg`.
Parameters
----------
filt : str or list of str
Substring(s) to match within filenames. Can be a single string or a list of multiple filters.
path : str or list
Path to the directory to search in. Can also be a list of filenames for direct filtering.
return_msg : str, optional
Behavior when no matching files are found:
- `'error'` (default): Raises `FileNotFoundError`.
- `None`: Returns `None` instead of raising an error.
exclude : str or list of str, optional
Substring(s) to exclude from matching files. Applied after filtering with `filt`.
Returns
----------
str
The path to the matching file if a single match is found.
list of str
A list of paths if multiple files match the filters.
None
If no matches are found and `return_msg=None`.
Raises
----------
FileNotFoundError
If no files match the specified filters and `return_msg='error'`.
Examples
----------
**Find a single file containing a substring:**
.. code-block:: python
from lazyfmri.utils import get_file_from_substring
# Search for a file containing "R2" in the /path/to/prf directory
file_path = get_file_from_substring("R2", "/path/to/prf")
print(file_path)
# Output: "/path/to/prf/r2.npy"
**Find a file matching multiple filters:**
.. code-block:: python
file_path = get_file_from_substring(["gauss", "best_vertices"], "path/to/pycortex/sub-xxx")
print(file_path)
# Output: "/path/to/pycortex/sub-xxx/sub-xxx_model-gauss_desc-best_vertices.csv"
**Find multiple matching files:**
.. code-block:: python
file_list = get_file_from_substring(["best_vertices"], "path/to/pycortex/sub-xxx")
print(file_list)
# Output:
# [
# "/path/to/pycortex/sub-xxx/sub-xxx_model-gauss_desc-best_vertices.csv",
# "/path/to/pycortex/sub-xxx/sub-xxx_model-norm_desc-best_vertices.csv"
# ]
**Exclude specific files from the results:**
.. code-block:: python
file_list = get_file_from_substring(["best_vertices"], "path/to/pycortex/sub-xxx", exclude="norm")
print(file_list)
# Output:
# [
# "/path/to/pycortex/sub-xxx/sub-xxx_model-gauss_desc-best_vertices.csv"
# ]
"""
input_is_list = False
if isinstance(filt, str):
filt = [filt]
if isinstance(exclude, str):
exclude = [exclude]
if isinstance(filt, list):
# list and sort all files in the directory
if isinstance(path, str):
files_in_directory = sorted(os.listdir(path))
elif isinstance(path, list):
input_is_list = True
files_in_directory = path.copy()
else:
raise ValueError(
"Unknown input type; should be string to path or list of files")
# the idea is to create a binary matrix for the files in 'path', loop
# through the filters, and find the row where all values are 1
filt_array = np.zeros((len(files_in_directory), len(filt)))
for ix, f in enumerate(files_in_directory):
for filt_ix, filt_opt in enumerate(filt):
filt_array[ix, filt_ix] = filt_opt in f
# now we have a binary <number of files x number of filters> array. If all filters were available in a file, the
# entire row should be 1,
# so we're going to look for those rows
full_match = np.ones(len(filt))
full_match_idc = np.where(np.all(filt_array == full_match, axis=1))[0]
if len(full_match_idc) == 1:
fname = files_in_directory[full_match_idc[0]]
if input_is_list:
return fname
else:
f = opj(path, fname)
if isinstance(exclude, list):
if not any(x in f for x in exclude):
return opj(path, fname)
else:
if return_msg == "error":
raise FileNotFoundError(
f"Could not find file with filters: {filt} and exclusion of [{exclude}] in '{path}'")
else:
return None
else:
return opj(path, fname)
elif len(full_match_idc) > 1:
match_list = []
for match in full_match_idc:
fname = files_in_directory[match]
if input_is_list:
match_list.append(fname)
else:
match_list.append(opj(path, fname))
if isinstance(exclude, list):
exl_list = []
for f in match_list:
if not any(x in f for x in exclude):
exl_list.append(f)
# return the string if there's only 1 element
if len(exl_list) == 1:
return exl_list[0]
else:
return exl_list
else:
return match_list
# return match_list
else:
if return_msg == "error":
raise FileNotFoundError(
f"Could not find file with filters: {filt} in {path}")
else:
return None
[docs]
def update_kwargs(kwargs, el, val, force=False):
if not force:
if el not in list(kwargs.keys()):
kwargs[el] = val
else:
kwargs[el] = val
return kwargs
[docs]
def find_nearest(array, value, return_nr=1):
"""find_nearest
Find the index and value in an array given a value. You can either choose to have 1 item (the `closest`) returned, or the
5 nearest items (`return_nr=5`), or everything you're interested in (`return_nr="all"`)
Parameters
----------
array: numpy.ndarray
array to search in
value: float
value to search for in `array`
return_nr: int, str, optional
number of elements to return after searching for elements in `array` that are close to `value`. Can either be an
integer or a string *all*
Returns
----------
int
integer representing the index of the element in `array` closest to `value`.
list
if `return_nr` > 1, a list of indices will be returned
numpy.ndarray
value in `array` at the index closest to `value`
"""
array = np.asarray(array)
if return_nr == 1:
idx = np.nanargmin((np.abs(array-value)))
return idx, array[idx]
else:
# check nan indices
nans = np.isnan(array)
# initialize output
idx = np.full_like(array, np.nan)
# loop through values in array
for qq, ii in enumerate(array):
# don't do anything if value is nan
if not nans[qq]:
idx[qq] = np.abs(ii-value)
# sort
idx = np.argsort(idx)
# return everything
if return_nr == "all":
idc_list = idx.copy()
else:
# return closest X values
idc_list = idx[:return_nr]
return idc_list, array[idc_list]
[docs]
def find_intersection(xx, curve1, curve2):
"""find_intersection
Find the intersection coordinates given two functions using `Shapely`.
Parameters
----------
xx: numpy.ndarray
array describing the x-axis values
curve1: numpy.ndarray
array describing the first curve
curve2: numpy.ndarray
array describing the first curve
Returns
----------
tuple
x,y coordinates where *curve1* and *curve2* intersect
Raises
----------
ValueError
if no intersection coordinates could be found
Example
----------
See https://fmriproc.readthedocs.io/en/latest/classes/prf.html#fmriproc.prf.SizeResponse.find_stim_sizes
"""
first_line = geometry.LineString(np.column_stack((xx, curve1)))
second_line = geometry.LineString(np.column_stack((xx, curve2)))
geom = first_line.intersection(second_line)
try:
if isinstance(geom, geometry.multipoint.MultiPoint):
# multiple coordinates
coords = [i.coords._coords for i in list(geom.geoms)]
elif isinstance(geom, geometry.point.Point):
# single coordinate
coords = [geom.coords._coords]
elif isinstance(geom, geometry.collection.GeometryCollection):
# coordinates + line segments
mapper = geometry.mapping(geom)
coords = []
for el in mapper["geometries"]:
coor = np.array(el["coordinates"])
if coor.ndim > 1:
coor = coor[0]
# to make indexing same as above
coords.append(coor[np.newaxis, ...])
else:
raise ValueError(f"Can't deal with output of type {type(geom)}")
# coords = geom
except Exception:
raise ValueError("Could not find intersection between curves..")
return coords
[docs]
def SDT(hits, misses, fas, crs):
"""
Computes Signal Detection Theory (SDT) measures, including d-prime (d'), beta, criterion (c),
and area under the d-prime distribution (Ad'), given the number of hits, misses, false alarms,
and correct rejections.
Parameters
----------
hits : int
Number of correct detections (signal present, response "yes").
misses : int
Number of missed detections (signal present, response "no").
fas : int
Number of false alarms (signal absent, response "yes").
crs : int
Number of correct rejections (signal absent, response "no").
Returns
-------
dict
A dictionary containing:
- `'d'` (float): d-prime, a measure of sensitivity.
- `'beta'` (float): Response bias (β), calculated as
.. code-block:: python
exp((Z(fa_rate)^2 - Z(hit_rate)^2) / 2).
- `'c'` (float): Criterion (c), calculated as:
.. code-block:: python
-(Z(hit_rate) + Z(fa_rate)) / 2.
- `'Ad'` (float): Area under the d-prime distribution, equivalent to `norm.cdf(d / sqrt(2))`.
- `'hit'` (float): Adjusted hit rate.
- `'fa'` (float): Adjusted false alarm rate.
Notes
-----
- To prevent infinite values in d-prime calculations, hit and false alarm rates of 0 or 1
are adjusted using a half-increment rule.
- `Z(x)` is the inverse cumulative distribution function (probit function) from the standard normal distribution.
- The function is useful in psychophysics and cognitive neuroscience to quantify detection sensitivity.
References
----------
- Green, D. M., & Swets, J. A. (1966). Signal detection theory and psychophysics.
- Macmillan, N. A., & Creelman, C. D. (2004). Detection theory: A user's guide.
Example
-------
>>> SDT(hits=50, misses=10, fas=5, crs=35)
{'d': 2.05, 'beta': 0.38, 'c': -0.1, 'Ad': 0.977, 'hit': 0.83, 'fa': 0.12}
"""
from scipy import stats
Z = stats.norm.ppf
# Floors an ceilings are replaced by half hits and half FA's
half_hit = 0.5 / (hits + misses)
half_fa = 0.5 / (fas + crs)
# Calculate hit_rate and avoid d' infinity
hit_rate = hits / (hits + misses)
if hit_rate == 1:
hit_rate = 1 - half_hit
if hit_rate == 0:
hit_rate = half_hit
# Calculate false alarm rate and avoid d' infinity
fa_rate = fas / (fas + crs)
if fa_rate == 1:
fa_rate = 1 - half_fa
if fa_rate == 0:
fa_rate = half_fa
# Return d', beta, c and Ad'
out = {}
out['d'] = Z(hit_rate) - Z(fa_rate)
out['beta'] = math.exp((Z(fa_rate)**2 - Z(hit_rate)**2) / 2)
out['c'] = -(Z(hit_rate) + Z(fa_rate)) / 2
out['Ad'] = stats.norm.cdf(out['d'] / math.sqrt(2))
out['hit'] = hit_rate
out['fa'] = fa_rate
return(out)
[docs]
def run_shell_wrapper(cmd, msg=None, verb=False):
"""
Execute a shell command with optional verbose logging and error handling.
This function runs a shell command using subprocess, capturing output and printing
informative messages. If `msg` is provided, it will be printed before executing.
If the command fails, both stdout and stderr will be printed for debugging.
Parameters
----------
cmd : str
The full shell command to execute.
msg : str, optional
Message to print before executing the command.
verb : bool, default=False
If True, echo the command being run with highlighting.
Returns
-------
None
Notes
-----
- Uses `shell=True`, so ensure `cmd` is properly sanitized.
- Automatically captures and displays stdout/stderr on failure.
- Intended as a user-friendly wrapper for external CLI tools.
Example
-------
run_shell_wrapper("fslmaths input.nii.gz -abs output.nii.gz", msg="Running FSL abs")
"""
import sys
import subprocess
if isinstance(msg, str):
verbose(msg, True)
try:
filt_cmd = " ".join(cmd.split())
verbose(filt_cmd, verb, highlight=True)
_ = subprocess.run(
filt_cmd,
shell=True,
check=True,
stdout=sys.stdout,
stderr=sys.stderr,
text=True
)
except subprocess.CalledProcessError as e:
print(f"{color.RED}{color.BOLD}ERROR{color.END}: Command '{e.cmd}' failed with exit code {e.returncode}")
sys.exit(e.returncode) # ← clean exit without traceback
except Exception as e:
print(f"{color.RED}{color.BOLD}Unexpected error{color.END}: {str(e)}")
sys.exit(1)
[docs]
def launch_itksnap(*args):
"""
Launch ITK-SNAP to view/edit images and wait for the GUI to close.
Parameters
----------
*args : str
Arbitrary command-line arguments to pass to ITK-SNAP (e.g., -g <image> -s <segmentation>)
Notes
-----
- Uses `open -W` on macOS to block until ITK-SNAP closes.
- On Linux, subprocess waits by default.
- On Windows, uses shell execution to block until ITK-SNAP closes.
"""
import subprocess
system = platform.system()
if system == "Darwin":
# macOS: open app in blocking mode
cmd = ["open", "-W", "-a", "ITK-SNAP", "--args"] + list(args)
elif system == "Windows":
# Windows: execute directly with shell
cmd = ["itksnap"] + list(args)
cmd = " ".join(cmd) # pass as string when using shell=True
else:
# Linux/Unix: use Popen and wait
cmd = ["itksnap"] + list(args)
# Print command
verbose(" ".join(cmd) if isinstance(cmd, list) else cmd, True, highlight=True)
# Run command
if system == "Darwin":
subprocess.run(cmd)
elif system == "Windows":
subprocess.run(cmd, shell=True)
else:
proc = subprocess.Popen(cmd)
proc.wait()
[docs]
def parse_kwargs_to_dict(input_list, delimiter="="):
"""
Parse a string or list of key-value pairs into a dictionary.
Supports automatic type conversion for booleans, numbers, and lists.
Parameters
----------
input_list : str
A single key=value pair (e.g., "alpha=0.01") or a comma-separated
string of such pairs (e.g., "alpha=0.01,beta=0.5,debug=True").
Values can be lists separated by '|', e.g., "layers=conv|pool|fc".
delimiter : str, optional
The character used to separate keys from values, by default "=".
Returns
-------
dict
Dictionary of parsed key-value pairs with appropriate types.
Examples
--------
>>> parse_kwargs_to_dict("alpha=0.01,beta=0.5,debug=True")
{'alpha': 0.01, 'beta': 0.5, 'debug': True}
>>> parse_kwargs_to_dict("layers=conv|pool|fc")
{'layers': ['conv', 'pool', 'fc']}
>>> parse_kwargs_to_dict("use_dropout=False,epochs=10")
{'use_dropout': False, 'epochs': 10}
"""
if "," in input_list:
kwargs_in_list = [i.strip() for i in input_list.split(",")]
else:
kwargs_in_list = [input_list.strip()]
kwargs_in_dict = {}
for item in kwargs_in_list:
if delimiter not in item:
continue # skip invalid entries
key, val = item.split(delimiter, 1)
# Handle lists (pipe-separated)
if "|" in val:
val_list = val.split("|")
parsed_vals = []
for v in val_list:
parsed_vals.append(_autotype(v))
kwargs_in_dict[key] = parsed_vals
else:
kwargs_in_dict[key] = _autotype(val)
return kwargs_in_dict
def _autotype(val):
"""Helper to convert string to bool, int, float, or str."""
val = val.strip()
if val.lower() == "true":
return True
elif val.lower() == "false":
return False
elif re.match(r"^-?\d+$", val):
return int(val)
elif re.match(r"^-?\d*\.\d+$", val):
return float(val)
return val