from . import plotting, utils
from nilearn.glm.first_level import first_level
from nilearn.glm.first_level import hemodynamic_models
from nilearn.plotting import (
plot_contrast_matrix,
)
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
[docs]
class GenericGLM():
"""GenericGLM
Main class to perform a simple GLM with python. Will do most of the processes internally, and allows you to plot various
processes along the way.
Parameters
----------
onset: pandas.DataFrame
Dataframe containing the onset times for all events in an experiment. Specifically design to work smoothly with
:class:`lazyfmri.dataset.ParseExpToolsFile`. You should insert the output from
:func:`lazyfmri.dataset.ParseExpToolsFile.get_onset_df()` as `onset`
data: numpy.ndarray, pandas.DataFrame
<time,voxels> numpy array or pandas DataFrame; required for creating the appropriate length of the stimulus vectors
hrf_pars: dict, optional
dictionary collecting the parameters required for :func:`lazyfmri.glm.double_gamma` (generally the defaults are fine
though!)
.. code-block:: python
pars = {
'lag': 6,
'a2': 12,
'b1': 12,
'b2': 12,
'c': 12,
'scale': True
}
TR: float
repetition time of acquisition
osf: int, optional
Oversampling factor used to account for decimal onset times, by default None. The larger this factor, the more
accurate decimal onset times will be processed, but also the bigger your upsampled convolved becomes, which means
convolving will take longer.
type: str, optional
Use block design of event-related design, by default 'event'. If set to 'block', `block_length` is required.
block_length: int, optional
Duration of block in seconds, by default None
amplitude: int, list, optional
Amplitude to be used when creating the stimulus vector, by default None. If nothing is specified, the amplitude will
be set to '1', like you would in a regular FSL 1-/3-column file. If you want variable amplitudes for different events
for in a simulation, you can specify a list with an equal length to the number of events present in `onset_df`.
regressors: pandas.DataFrame, numpy.ndarray, optional
Add a bunch of regressors to the design
make_figure: bool, optional
Create overview figure of HRF, stimulus vector, and convolved stimulus vector, by default False
scan_length: int
number of volumes in `data` (= `scan_length` in :func:`lazyfmri.glm.make_stimulus_vector`)
xkcd: bool, optional
Plot the figre in XKCD-style (cartoon), by default False
plot_vox: int, optional
Instead of plotting the best-fitting voxel, specify which voxel to plot the timecourse and fit of, by default None
plot_event: str, int, list, optional
If a larger design matrix was inputted with multiple events, you can specify here the name of the event you'd like to
plot the betas from. It also accepts a list of indices of events to plot, so you could plot the first to events by
specifying `plot_event=[1,2]`. Remember, the 0th index is the intercept! By default we'll plot the event right after
the intercept
contrast_matrix: numpy.ndarray, optional
contrast array for the event regressors. If none, we'll create a contrast matrix that estimates the effect of each
regressor and the baseline
nilearn: bool, optional
use nilearn implementation of `FirstLevelModel` (True) or bare python (False). The later gives easier access to betas,
while the former allows implementation of AR-noise models.
Returns
----------
dict
Dictionary collecting outputs under the following keys
- "betas": <n_regressors (+intercept), n_voxels> beta values
- "tstats": <n_regressors (+intercept), n_voxels> t-statistics (FSL-way)
- "x_conv": <n_timepoints, n_regressors (+intercept)> design matrix
- "resids": <n_timepoints, n_voxels> residuals>
matplotlib.pyplot
plots along the process if `make_figure=True`
Example
----------
.. code-block:: python
# import modules
from lazyfmri.glm import GenericGLM
from lazyfmri import dataset
# define file with fMRI-data and the output from Exptools2
func_file = "some_func_file.mat"
exp_file = "some_exp_file.tsv"
# load in functional data
obj = dataset.Dataset(
func_file,
exp_file=exp_file,
subject=1,
run=1,
deleted_first_timepoints=200,
deleted_last_timepoints=200)
# fetch HP-filtered, percent-signal changed data
data = obj.fetch_fmri()
onsets = obj.fetch_onsets()
# do the fitting
fitting = GenericGLM(
onsets,
data.values,
TR=func.TR,
osf=1000
)
"""
def __init__(
self,
onsets,
data,
hrf_pars=None,
TR=None,
osf=1,
contrast_matrix=None,
exp_type='event',
block_length=None,
amplitude=None,
regressors=None,
make_figure=False,
xkcd=False,
plot_design=False,
plot_convolve=False,
plot_event=None,
plot_vox=None,
plot_fit=False,
verbose=False,
nilearn=False,
derivative=False,
dispersion=False,
add_intercept=True,
fit=True,
copes=None,
cmap='inferno',
kw_conv={}
):
# instantiate
self.onsets = onsets
self.hrf_pars = hrf_pars
self.TR = TR
self.osf = osf
self.exp_type = exp_type
self.block_length = block_length
self.amplitude = amplitude
self.regressors = regressors
self.make_figure = make_figure
self.xkcd = xkcd
self.plot_event = plot_event
self.plot_vox = plot_vox
self.verbose = verbose
self.contrast_matrix = contrast_matrix
self.nilearn_method = nilearn
self.dispersion = dispersion
self.derivative = derivative
self.add_intercept = add_intercept
self.regressors = regressors
self.cmap = cmap
self.run_fit = fit
self.copes = copes
self.plot_fit = plot_fit
self.plot_design = plot_design
self.plot_convolve = plot_convolve
self.kw_conv = kw_conv
if isinstance(data, np.ndarray):
self.data = data.copy()
elif isinstance(data, pd.DataFrame):
self.orig = data.copy()
self.data = data.values
else:
raise ValueError("Data must be 'np.ndarray' or 'pandas.DataFrame'")
# define HRF
self.hrf = self.define_hrf()
# make the stimulus vectors
self.stims = self.define_stimvector()
# convolve stimulus vectors
self.stims_convolved = self.convolve_stims(**self.kw_conv)
# resample
self.stims_convolved_resampled = self.resample_stimvector()
# get condition names
self.condition_names = list(self.stims_convolved_resampled.keys())
[docs]
def define_hrf(self):
utils.verbose(
f"Defining HRF with option '{self.hrf_pars}'",
self.verbose)
return define_hrf(
hrf_pars=self.hrf_pars,
osf=self.osf,
TR=self.TR,
dispersion=self.dispersion,
derivative=self.derivative
)
[docs]
def define_stimvector(self):
utils.verbose("Creating stimulus vector(s)", self.verbose)
return make_stimulus_vector(
self.onsets,
scan_length=self.data.shape[0],
osf=self.osf,
type=self.exp_type,
TR=self.TR,
block_length=self.block_length,
amplitude=self.amplitude
)
[docs]
def convolve_stims(self, **kwargs):
utils.verbose("Convolve stimulus vectors with HRF", self.verbose)
return convolve_hrf(
self.hrf,
self.stims,
TR=self.TR,
osf=self.osf,
make_figure=self.plot_convolve,
xkcd=self.xkcd,
**kwargs
)
[docs]
def resample_stimvector(self):
if self.osf > 1:
utils.verbose("Resample convolved stimulus vectors", self.verbose)
return resample_stim_vector(
self.stims_convolved, self.data.shape[0])
else:
return self.stims_convolved.copy()
[docs]
def create_design(self, make_figure=False):
# finalize design matrix (with regressors)
utils.verbose("Creating design matrix", self.verbose)
self.design = first_level_matrix(
self.stims_convolved_resampled,
regressors=self.regressors,
add_intercept=self.add_intercept
)
if make_figure:
self.plot_design_matrix()
[docs]
def fit(
self,
nilearn_method=False,
make_figure=False,
xkcd=False,
plot_vox=None,
plot_event=None,
cmap="inferno",
copes=None,
plot_full_only=False,
plot_full=False,
save_as=None,
**kwargs
):
self.nilearn_method = nilearn_method
self.xkcd = xkcd
self.plot_vox = plot_vox
self.plot_event = plot_event
self.cmap = cmap
self.copes = copes
self.make_figure = make_figure
self.plot_full_only = plot_full_only
self.plot_full = plot_full
self.save_as = save_as
if self.nilearn_method:
# we're going to hack Nilearn's FirstLevelModel to be compatible
# with our line-data. First, we specify the model as usual
self.fmri_glm = first_level.FirstLevelModel(
t_r=self.TR,
noise_model='ar1',
standardize=False,
hrf_model='spm',
drift_model='cosine',
high_pass=.01)
# Normally, we'd run `fmri_glm = fmri_glm.fit()`, but because this
# requires nifti-like inputs, we run `run_glm` outside of that
# function to get the labels:
if isinstance(self.data, pd.DataFrame):
self.data = self.data.values
elif isinstance(self.data, np.ndarray):
self.data = self.data.copy()
else:
raise ValueError(
f"""Unknown input type {type(self.data)} for functional data. Must be pd.DataFrame or np.ndarray
[time, voxels]""")
self.labels, self.results = first_level.run_glm(
self.data, self.design, noise_model='ar1')
# Then, we inject this into the `fmri_glm`-class so we can compute
# contrasts
self.fmri_glm.labels_ = [self.labels]
self.fmri_glm.results_ = [self.results]
# insert the design matrix:
self.fmri_glm.design_matrices_ = []
self.fmri_glm.design_matrices_.append(self.design)
# Then we specify our contrast matrix:
if self.contrast_matrix is None:
if self.verbose:
print("Defining standard contrast matrix")
matrix = np.eye(len(self.condition_names))
icept = np.zeros((len(self.condition_names), 1))
matrix = np.hstack((icept, matrix)).astype(int)
self.contrast_matrix = matrix.copy()
self.conditions = {}
for idx, name in enumerate(self.condition_names):
self.conditions[name] = self.contrast_matrix[idx, ...]
if self.verbose:
print("Computing contrasts")
self.tstats = []
for event in self.conditions:
tstat = self.fmri_glm.compute_contrast(
self.conditions[event],
stat_type='t',
output_type='stat',
return_type=None)
self.tstats.append(tstat)
self.tstats = np.array(self.tstats)
else:
if not isinstance(self.plot_event, (str, list)):
self.plot_event = self.condition_names
self.results = fit_first_level(
self.design,
self.data,
make_figure=self.make_figure,
xkcd=self.xkcd,
plot_vox=self.plot_vox,
plot_event=self.plot_event,
cmap=self.cmap,
verbose=self.verbose,
copes=self.copes,
plot_full_only=self.plot_full_only,
plot_full=self.plot_full,
add_intercept=self.add_intercept,
save_as=self.save_as,
**kwargs
)
[docs]
def plot_contrast_matrix(self, save_as=None):
if self.nilearn_method:
cols = list(self.design.columns)
fig, axs = plt.subplots(figsize=(len(cols), 10))
plot_contrast_matrix(
self.contrast_matrix,
design_matrix=self.design,
ax=axs)
plotting.conform_ax_to_obj(axs)
fig, axs = plt.subplots(figsize=(10, 10))
plot_contrast_matrix(
self.contrast_matrix,
design_matrix=self.design,
ax=axs)
if save_as:
fig.savefig(save_as)
else:
raise NotImplementedError(
"Can't use this function without nilearn-fitting. Set 'nilearn=True'")
[docs]
def plot_design_matrix(self, save_as=None):
cols = list(self.design.columns)
fig, axs = plt.subplots(figsize=(len(cols), 10))
plotting.plot_design_matrix(self.design, ax=axs)
plotting.conform_ax_to_obj(axs)
if save_as:
fig.savefig(save_as)
[docs]
def glover_hrf(osf=1, TR=0.105, dispersion=False,
derivative=False, time_length=25):
# osf factor is different in `hemodynamic_models`
# osf /= 10
# set kernel
hrf_kernel = []
hrf = hemodynamic_models.glover_hrf(
TR, oversampling=osf, time_length=time_length)
hrf /= hrf.max()
hrf_kernel.append(hrf)
if derivative:
tderiv_hrf = hemodynamic_models.glover_time_derivative(
tr=TR, oversampling=osf, time_length=time_length)
tderiv_hrf /= tderiv_hrf.max()
hrf_kernel.append(tderiv_hrf)
if dispersion:
tdisp_hrf = hemodynamic_models.glover_dispersion_derivative(
TR, oversampling=osf, time_length=time_length)
tdisp_hrf /= tdisp_hrf.max()
hrf_kernel.append(tdisp_hrf)
return hrf_kernel
[docs]
def spm_hrf(osf=1, TR=0.105, dispersion=False,
derivative=False, time_length=25):
# osf factor is different in `hemodynamic_models`
# osf /= 10
# set kernel
hrf_kernel = []
hrf = hemodynamic_models.spm_hrf(
TR, oversampling=osf, time_length=time_length)
hrf /= hrf.max()
hrf_kernel.append(hrf)
if derivative:
tderiv_hrf = hemodynamic_models.spm_time_derivative(
tr=TR, oversampling=osf, time_length=time_length)
tderiv_hrf /= tderiv_hrf.max()
hrf_kernel.append(tderiv_hrf)
if dispersion:
tdisp_hrf = hemodynamic_models.spm_dispersion_derivative(
TR, oversampling=osf, time_length=time_length)
tdisp_hrf /= tdisp_hrf.max()
hrf_kernel.append(tdisp_hrf)
return hrf_kernel
[docs]
def make_stimulus_vector(
onset_df,
scan_length=None,
cov_as_ampl=None,
TR=0.105,
osf=1,
type='event',
block_length=None,
amplitude=None):
"""make_stimulus_vector
Creates a stimulus vector for each of the conditions found in `onset_df`. You can account for onset times being in decimal
using the oversampling factor `osf`. This would return an upsampled stimulus vector which should be convolved with an
equally upsampled HRF. This can be ensured by using the same `osf` in :func:`lazyfmri.glm.double_gamma`.
Parameters
----------
onset_df: pandas.DataFrame
onset times as read in with :class:`lazyfmri.dataset.ParseExpToolsFile`
scan_length: float, optional
length of the , by default None
TR: float, optional
Repetition time, by default 0.105. Will be used to calculate the required length of the stimulus vector
osf: [type], optional
Oversampling factor used to account for decimal onset times, by default None
type: str, optional
Use block design of event-related design, by default 'event'. If set to 'block', `block_length` is required.
block_length: int, optional
Duration of block in seconds, by default None
amplitude: int, list, optional
Amplitude to be used when creating the stimulus vector, by default None. If nothing is specified, the amplitude will
be set to '1', like you would in a regular FSL 1-/3-column file. If you want variable amplitudes for different events
for in a simulation, you can specify a list with an equal length to the number of events present in `onset_df`.
Returns
----------
dict
Dictionary collecting numpy array stimulus vectors for each event present in `onset_df` under the keys <event name>
Raises
----------
ValueError
`onset_df` should contain event names
ValueError
if multiple amplitudes are requested but the length of `amplitude` does not match the number of events
ValueError
`block_length` should be an integer
Example
----------
.. code-block:: python
from lazyfmri import utils, glm
exp_file = 'path/to/exptools2_file.tsv'
exp_df = utilsParseExpToolsFile(exp_file, subject=1, run=1)
times = exp_df.get_onset_df()
# oversample with factor 1000 to get rid of 3 decimals in onset times
osf = 1000
# make stimulus vectors
stims = glm.make_stimulus_vector
times,
scan_length=400,
osf=osf,
type='event'
)
.. code-block:: python
# stims
{
'left': array([0., 0., 0., ..., 0., 0., 0.]),
'right': array([0., 0., 0., ..., 0., 0., 0.])
}
"""
# check if we should reset or not
try:
onset_df = onset_df.reset_index()
except BaseException:
onset_df = onset_df
# check conditions we have
try:
names_cond = utils.get_unique_ids(onset_df, id="event_type")
except BaseException:
raise ValueError(
"""Could not extract condition names; are you sure you formatted the dataframe correctly? Mostly likely you passed
the functional dataframe as onset dataframe. Please switch""")
# check if we should use covariate as amplitude
if isinstance(cov_as_ampl, (bool, list)):
if isinstance(cov_as_ampl, bool):
cov_as_ampl = [cov_as_ampl for _ in names_cond]
# check if we got multiple amplitudes
if isinstance(amplitude, np.ndarray):
ampl_array = amplitude
elif isinstance(amplitude, list):
ampl_array = np.array(amplitude)
else:
ampl_array = False
# loop through unique conditions
stim_vectors = {}
for idx, condition in enumerate(names_cond):
tmp_onsets = utils.select_from_df(
onset_df, expression=f"event_type = {condition}")
set_ampl = True
if isinstance(cov_as_ampl, list):
if cov_as_ampl[idx]:
if "cov" in list(tmp_onsets.columns):
ampl = tmp_onsets["cov"].values
set_ampl = False
if set_ampl:
if isinstance(ampl_array, np.ndarray):
if ampl_array.shape[0] == names_cond.shape[0]:
ampl = amplitude[idx]
print(
f"Amplitude for event '{names_cond[idx]}' = {round(ampl,2)}")
else:
raise ValueError(
f"""Nr of amplitudes ({ampl_array.shape[0]}) does not match number of conditions
({names_cond.shape[0]})""")
else:
ampl = 1
Y = np.zeros(int((scan_length * osf)))
if type == "event":
for rr, ii in enumerate(tmp_onsets['onset']):
try:
if isinstance(ampl, (list, np.ndarray)):
use_ampl = ampl[rr]
else:
use_ampl = ampl
Y[int((ii * osf) / TR)] = use_ampl
except BaseException:
print(
f"""Warning: could not include event {rr} with t = {ii}. Probably experiment continued after
functional acquisition""")
elif type == 'block':
for rr, ii in enumerate(tmp_onsets['onset']):
Y[int((ii * osf) / TR):int(((ii + block_length) * osf) / TR)] = ampl
else:
raise ValueError(f"Event must be 'event' or 'block', not '{type}'")
stim_vectors[condition] = Y
return stim_vectors
[docs]
def define_hrf(
hrf_pars="glover",
TR=0.105,
osf=1,
dispersion=False,
derivative=False):
if isinstance(hrf_pars, str):
if hrf_pars == "glover":
hrf = glover_hrf(
osf=osf,
TR=TR,
dispersion=dispersion,
derivative=derivative)
elif hrf_pars == "spm":
hrf = spm_hrf(
osf=osf,
TR=TR,
dispersion=dispersion,
derivative=derivative)
else:
raise ValueError(
f"""Invalid option '{hrf_pars}' specified. Must be either 'spm' or 'glover', a list of 3 parameters, a numpy
array describing the HRF, or None (for standard double gamma)""")
elif isinstance(hrf_pars, np.ndarray):
if hrf_pars.ndim == 1:
hrf = [hrf_pars]
else:
hrf = [arr for arr in hrf_pars]
elif isinstance(hrf_pars, list):
hrf = np.array(
[
np.ones_like(hrf_pars[1]) * hrf_pars[0] *
hemodynamic_models.spm_hrf(
tr=TR,
oversampling=osf,
time_length=40)[..., np.newaxis],
hrf_pars[1] *
hemodynamic_models.spm_time_derivative(
tr=TR,
oversampling=osf,
time_length=40)[..., np.newaxis],
hrf_pars[2] *
hemodynamic_models.spm_dispersion_derivative(
tr=TR,
oversampling=osf,
time_length=40)[..., np.newaxis]]).sum(
axis=0)
hrf = [np.squeeze(hrf)]
else:
osf /= TR
dt = 1 / osf
time_points = np.linspace(0, 25, np.rint(float(25) / dt).astype(int))
hrf = [double_gamma(time_points, lag=6)]
return hrf
[docs]
def convolve_hrf(
hrf,
stim_v,
TR=1,
osf=1,
make_figure=False,
xkcd=False,
time=None,
*args,
**kwargs):
"""convolve_hrf
Convolve :func:`lazyfmri.glm.double_gamma` with :func:`lazyfmri.glm.make_stimulus_vector`. There's an option to plot the
result in a nice overview figure, though python-wise it's not the prettiest..
Parameters
----------
hrf: numpy.ndarray
HRF across given timepoints with shape (,`x.shape[0]`)
stim_v: numpy.ndarray, list
Stimulus vector as per :func:`lazyfmri.glm.make_stimulus_vector` or numpy array containing one stimulus vector (e.g.,
a *key* from :func:`lazyfmri.glm.make_stimulus_vector`)
TR: float
repetition time of acquisition
make_figure: bool, optional
Create overview figure of HRF, stimulus vector, and convolved stimulus vector, by default False
scan_length: int
number of volumes in `data` (= `scan_length` in :func:`lazyfmri.glm.make_stimulus_vector`)
xkcd: bool, optional
Plot the figre in XKCD-style (cartoon), by default False
add_array1: numpy.ndarray, optional
additional stimulus vector to be plotted on top of `stim_v`, by default None
add_array2: numpy.ndarray, optional
additional **convolved** stimulus vector to be plotted on top of `stim_v`, by default None
regressors: pandas.DataFrame
add a bunch of regressors with shape <time,voxels> to the design matrix. Should be in the dimensions of the functional
data, not the oversampled..
Returns
----------
matplotlib.plot
if `make_figure=True`, a figure will be displayed
pandas.DataFrame
if `osf > 1`, then resampled stimulus vector DataFrame is returned. If not, the convolved stimulus vectors are
returned in a dataframe as is
Example
----------
.. code-block:: python
from lazyfmri.glm import convolve_hrf
convolved_stim_vector_left = convolve_hrf(hrf_custom, stims, make_figure=True, xkcd=True) # creates figure too
convolved_stim_vector_left = convolve_hrf(hrf_custom, stims) # no figure
"""
def plot(
stim_v,
hrf,
convolved,
osf=1,
ev=None,
xkcd=False,
time=time,
**kwargs):
fig = plt.figure(figsize=(20, 6))
gs = fig.add_gridspec(2, 2, width_ratios=[20, 10], hspace=0.7)
if isinstance(hrf, np.ndarray):
hrf = [hrf]
if stim_v.min() < 0:
y_lim = [-1, 1]
y_ticks = [-1, 0, 1]
else:
y_lim = [-0.5, 1]
y_ticks = [0, 1]
ax0 = fig.add_subplot(gs[0, 0])
plotting.LazyLine(
stim_v,
color="#B1BDBD",
axs=ax0,
title=f"Events ('{ev}')",
y_lim=y_lim,
x_label='scan volumes (* osf)',
y_label='magnitude (a.u.)',
xkcd=xkcd,
font_size=16,
y_ticks=y_ticks,
*args,
**kwargs)
# print(list(convolved.T)[0].shape)
ax1 = fig.add_subplot(gs[1, 0])
plotting.LazyLine(
list(convolved.T),
axs=ax1,
title="Convolved stimulus-vector",
x_label='scan volumes (* osf)',
y_label='magnitude (a.u.)',
*args,
**kwargs)
ax2 = fig.add_subplot(gs[:, 1])
if not isinstance(time, (list, np.ndarray)):
time = list(((np.arange(0, hrf[0].shape[0])) / osf) * TR)
labels = [
"HRF",
"1st derivative",
"2nd derivative"
]
plotting.LazyLine(
hrf,
xx=time,
axs=ax2,
title="HRF",
x_label='Time (s)',
labels=labels[:len(hrf)],
*args,
**kwargs)
# check hrf input
if isinstance(hrf, np.ndarray):
hrfs = [hrf]
elif isinstance(hrf, list):
hrfs = hrf.copy()
else:
raise ValueError(
f"Unknown input type '{type(hrf)}' for HRF. Must be list or array")
# convolve stimulus vectors
if isinstance(stim_v, np.ndarray):
if len(hrfs) >= 1:
convolved_stim_vector = np.zeros((stim_v.shape[0], len(hrfs)))
for ix, rf in enumerate(hrfs):
convolved_stim_vector[:, ix] = np.convolve(
stim_v, rf, 'full')[:stim_v.shape[0]]
if make_figure:
plot(stim_v, hrfs, convolved_stim_vector, xkcd=xkcd)
plt.show()
elif isinstance(stim_v, dict):
if len(hrfs) >= 1:
convolved_stim_vector = {}
for event in list(stim_v.keys()):
hrf_conv = np.zeros((stim_v[event].shape[0], len(hrf)))
for ix, rf in enumerate(hrfs):
hrf_conv[..., ix] = np.convolve(stim_v[event], rf, 'full')[
:stim_v[event].shape[0]]
convolved_stim_vector[event] = hrf_conv
if make_figure:
if xkcd:
with plt.xkcd():
plot(
stim_v[event],
hrfs,
convolved_stim_vector[event],
osf=osf,
ev=event,
time=time,
*args,
**kwargs)
else:
plot(
stim_v[event],
hrfs,
convolved_stim_vector[event],
ev=event,
time=time,
osf=osf,
*args,
**kwargs)
else:
raise ValueError("Data must be 'np.ndarray' or 'dict'")
return convolved_stim_vector
[docs]
def resample_stim_vector(convolved_array, scan_length, interpolate='nearest'):
"""resample_stim_vector
Resample the oversampled stimulus vector back in to functional time domain
Parameters
----------
convolved_array: dict, numpy.ndarray
oversampled convolved stimulus vector as per :func:`lazyfmri.glm.convolve_hrf`
scan_length: int
number of volumes in `data` (= `scan_length` in :func:`lazyfmri.glm.make_stimulus_vector`)
interpolate: str, optional
interpolation method, by default 'nearest'
Returns
----------
dict, numpy.ndarray
convolved stimulus vector in time domain that matches the fMRI acquisition
Example
----------
.. code-block:: python
from lazyfmri.glm import resample_stim_vector
scan_length = 230
convolved_stim_vector_left_ds = resample_stim_vector(convolved_stim_vector_left, scan_length)
"""
if isinstance(convolved_array, np.ndarray):
interpolated = interp1d(
np.arange(
len(convolved_array)),
convolved_array,
kind=interpolate,
axis=0,
fill_value='extrapolate')
downsampled = interpolated(
np.linspace(
0,
len(convolved_array),
scan_length))
elif isinstance(convolved_array, dict):
downsampled = {}
for event in list(convolved_array.keys()):
event_arr = convolved_array[event]
if event_arr.shape[-1] > 1:
tmp = np.zeros((scan_length, event_arr.shape[-1]))
for elem in range(event_arr.shape[-1]):
data = event_arr[..., elem]
interpolated = interp1d(
np.arange(len(data)), data, kind=interpolate, axis=0, fill_value='extrapolate')
tmp[..., elem] = interpolated(
np.linspace(0, len(data), scan_length))
downsampled[event] = tmp
else:
interpolated = interp1d(
np.arange(
len(
convolved_array[event])),
convolved_array[event],
kind=interpolate,
axis=0,
fill_value='extrapolate')
downsampled[event] = interpolated(np.linspace(
0, len(convolved_array[event]), scan_length))
else:
raise ValueError("Data must be 'np.ndarray' or 'dict'")
return downsampled
def get_event_prediction(
ev,
X,
betas,
ev_names,
include_intercept=False,
include_derivatives=True,
rf_mode=False,
):
"""
Get prediction for one event by selecting matching design columns and betas.
Parameters
----------
ev : str
Event name to match in `col_names`, e.g. "CSm", "CSpr", "CSpu".
X : ndarray
Convolved design matrix, shape (n_timepoints, n_regressors).
betas : ndarray
Beta vector, shape (n_regressors,) or compatible.
col_names : list of str
Column names corresponding to columns in `X_conv`.
include_intercept : bool
Whether to always include the intercept column.
include_regressor : bool
Whether to include columns containing "regressor".
Returns
-------
ev_preds : ndarray
Predicted signal for this event.
event : ndarray
Event-specific design matrix.
betas : ndarray
Event-specific beta values.
beta_idx : list[int]
Selected column indices.
"""
if rf_mode:
include_intercept = False
beta_idx = [
ix for ix, name in enumerate(ev_names)
if (
name == ev
or (include_derivatives and name.startswith(f"{ev}_"))
or (include_intercept and name == "intercept")
)
]
ev_betas = np.asarray(betas)[beta_idx].squeeze()
if rf_mode:
if X.shape[1] != ev_betas.shape[0]:
raise ValueError(
f"{ev}: RF basis has {X.shape[1]} columns, "
f"but selected {ev_betas.shape[0]} betas: "
f"{[ev_names[i] for i in beta_idx]}"
)
ev_preds = X @ ev_betas
return ev_preds, X, ev_betas, beta_idx
event = X[:, beta_idx]
ev_preds = event @ ev_betas
return ev_preds, event, ev_betas, beta_idx
[docs]
def get_event_prediction(
ev,
X,
betas,
ev_names,
rf_mode=False,
include_intercept=True,
include_derivatives=True,
):
"""
Get prediction for one event by selecting matching design columns and betas.
Parameters
----------
ev : str
Event name to match in `col_names`, e.g. "CSm", "CSpr", "CSpu".
X : ndarray
Convolved design matrix, shape (n_timepoints, n_regressors).
betas : ndarray
Beta vector, shape (n_regressors,) or compatible.
col_names : list of str
Column names corresponding to columns in `X_conv`.
include_intercept : bool
Whether to always include the intercept column.
include_derivatives : bool
Whether to include columns containing "regressor" representing the derivatives.
rf_mode : bool
If True, interpret X as basis set and return RF.
Returns
-------
ev_preds : ndarray
Predicted signal for this event.
event : ndarray
Event-specific design matrix.
betas : ndarray
Event-specific beta values.
beta_idx : list[int]
Selected column indices.
"""
# --- select relevant betas ---
beta_idx = [
ix for ix, name in enumerate(ev_names)
if (
name == ev
or (include_derivatives and name.startswith(f"{ev}_"))
or (include_intercept and name == "intercept")
)
]
ev_betas = betas[beta_idx]
print(ev_betas.shape)
# --- RF mode: X is basis set ---
if rf_mode:
# IMPORTANT: here X should already be the basis set for THIS event
ev_preds = X @ ev_betas
return ev_preds, X, ev_betas, beta_idx
# --- standard mode: X is design matrix ---
event = X[:, beta_idx]
ev_preds = event @ ev_betas
return ev_preds, event, ev_betas, beta_idx
[docs]
def first_level_matrix(
stims_dict,
regressors=None,
add_intercept=True,
names=None
):
# make dataframe of stimulus vectors
if isinstance(stims_dict, np.ndarray):
if names:
stims = pd.DataFrame(stims_dict, columns=names)
else:
stims = pd.DataFrame(stims_dict, columns=[
f'event {ii}' for ii in range(stims_dict.shape[-1])])
elif isinstance(stims_dict, dict):
# check if we got time/dispersion derivatives
cols = []
data = []
keys = list(stims_dict.keys())
for key in keys:
if stims_dict[key].shape[-1] == 1:
cols.extend([key])
elif stims_dict[key].shape[-1] == 2:
cols.extend([key, f'{key}_1st_derivative'])
elif stims_dict[key].shape[-1] == 3:
cols.extend([key, f'{key}_1st_derivative',
f'{key}_2nd_derivative'])
data.append(stims_dict[key])
data = np.concatenate(data, axis=-1)
stims = pd.DataFrame(data, columns=cols)
else:
raise ValueError("Data must be 'np.ndarray' or 'dict'")
# check if we should add intercept
if add_intercept:
intercept = np.ones((stims.shape[0], 1))
intercept_df = pd.DataFrame(intercept, columns=['intercept'])
X_matrix = pd.concat([intercept_df, stims], axis=1)
else:
X_matrix = stims.copy()
# check if we should add regressors
if isinstance(regressors, np.ndarray):
regressors_df = pd.DataFrame(regressors, columns=[
f'regressor_{ii+1}' for ii in range(regressors.shape[-1])])
return pd.concat([X_matrix, regressors_df], axis=1)
elif isinstance(regressors, dict):
regressors_df = pd.DataFrame(regressors)
return pd.concat([X_matrix, regressors_df], axis=1)
elif isinstance(regressors, pd.DataFrame):
return pd.concat([X_matrix, regressors], axis=1)
else:
return X_matrix
[docs]
def get_event_predictions(events, X, betas, ev_names, **kwargs):
return {
ev: get_event_prediction(
ev=ev,
X=X,
betas=betas,
ev_names=ev_names,
**kwargs,
)[0]
for ev in events
}
[docs]
def fit_first_level(
stim_vector,
data,
make_figure=False,
copes=None,
plot_vox=None,
plot_event=1,
verbose=False,
cmap='inferno',
plot_full_only=False,
plot_full=False,
add_intercept=True,
axs=None,
figsize=(16, 4),
**kwargs):
"""fit_first_level
First level models are, in essence, linear regression models run at the level of a single session or single subject. The
model is applied on a voxel-wise basis, either on the whole brain or within a region of interest. The timecourse of each
voxel is regressed against a predicted BOLD response created by convolving the haemodynamic response function (HRF) with a
set of predictors defined within the design matrix (source: https://nilearn.github.io/glm/first_level_model.html)
Parameters
----------
stim_vector: pandas.DataFrame, numpy.ndarray
either the output from :func:`lazyfmri.glm.resample_stim_vector` (convolved stimulus vector in fMRI-acquisition time
domain) or a pandas.DataFrame containing the full design matrix as per the output of
:func:`lazyfmri.glm.first_level_matrix`.
data: numpy.ndarray
<time,voxels> numpy array; same input as **data** from :func:`lazyfmri.glm.make_stimulus_vector`
make_figure: bool, optional
Create a figure of best-voxel fit, by default False
copes: [type], optional
[description], by default None
xkcd: bool, optional
Plot the figre in XKCD-style (cartoon), by default False
plot_vox: int, optional
Instead of plotting the best-fitting voxel, specify which voxel to plot the timecourse and fit of, by default None
plot_event: str, int, list, optional
If a larger design matrix was inputted with multiple events, you can specify here the name of the event you'd like to
plot the betas from. It also accepts a list of indices of events to plot, so you could plot the first to events by
specifying `plot_event=[1,2]`. Remember, the 0th index is the intercept! By default we'll plot the event right after
the intercept
Returns
----------
numpy.ndarray
betas for each voxel for the intercept and the number of stim_vectors used (in case you also add regressors)
numpy.ndarray
the design matrix `X_conv`
Example
----------
.. code-block:: python
from lazyfmri.glm import fit_first_level
# plots first event
betas_left,x_conv_left = fit_first_level(
convolved_stim_vector_left_ds,
data,
make_figure=True
)
.. code-block:: python
# plots first two events
betas_left,x_conv_left = fit_first_level(
convolved_stim_vector_left_ds,
data,
make_figure=True,
plot_events=[1,2]
)
"""
utils.verbose("Running GLM", verbose)
if isinstance(data, (pd.DataFrame, pd.Series)):
data = data.values
# add intercept if input is simple numpy array.
if isinstance(stim_vector, np.ndarray):
if stim_vector.ndim == 1:
stim_vector = stim_vector[:, np.newaxis]
if data.ndim == 1:
data = data[:, np.newaxis]
if stim_vector.shape[0] != data.shape[0]:
stim_vector = stim_vector[:data.shape[0], :]
# create design matrix with intercept
if add_intercept:
intercept = np.ones((data.shape[0], 1))
intercept_df = pd.DataFrame(intercept, columns=['intercept'])
X_matrix = pd.concat([intercept_df, stim_vector], axis=1)
else:
X_matrix = pd.DataFrame(stim_vector, axis=1)
else:
# everything should be ready to go if 'first_level_matrix' was used
X_matrix = stim_vector.copy()
if data.ndim == 1:
data = data[:, np.newaxis]
if stim_vector.shape[0] != data.shape[0]:
raise ValueError(
f"""Unequal shapes between design {stim_vector.shape} and data {data.shape}. Make sure first elements have the
same shape""")
X_conv = X_matrix.values.copy()
# set default to first predictor
if not isinstance(copes, (int, list, np.ndarray)):
copes = 1
# convert to array
if isinstance(copes, int):
C = np.zeros((X_conv.shape[1]))
C[copes] = 1
elif isinstance(copes, list):
C = np.array(copes, dtype=int)
else:
C = copes.copy()
# enforce integer
C = C.astype(int)
good_shape = np.zeros((C.shape[0], X_conv.shape[1]))
utils.paste(good_shape, C)
C = good_shape.copy()
# force into 2d
if C.ndim == 1:
C = C[np.newaxis, ...]
# np.linalg.pinv(X) = np.inv(X.T @ X) @ X
betas_conv, sse, rank, s = np.linalg.lstsq(X_conv, data, rcond=-1)
# betas_conv = np.linalg.inv(X_conv.T @ X_conv) @ X_conv.T @ data
# get full model predictions
preds = X_conv @ betas_conv
# calculate r2
r2 = calculate_r2(data, sse=sse)
# loop through contrasts
tstat = calculate_tstats(
dm=X_conv,
C=C,
betas=betas_conv,
sse=sse,
rank=rank
)
if verbose:
for co_ix in range(C.shape[0]):
if tstat[co_ix].shape[0] < 10:
print(f"t-stat {C[co_ix,:]}: {tstat[co_ix]}")
def best_voxel(r2):
# get max over voxels and find max
return np.where(r2 == r2.max())[0][0]
if not plot_vox:
best_vox = best_voxel(r2)
else:
best_vox = plot_vox
if betas_conv.ndim == 1:
betas_conv = betas_conv[..., np.newaxis]
if make_figure:
if not isinstance(axs, mpl.axes._axes.Axes):
fig, axs = plt.subplots(figsize=figsize)
# set defaults for actual datapoints
markers = ['.']
colors = ["#cccccc"]
linewidth = [0.5]
col_names = X_matrix.columns.to_list()
if not plot_full_only:
# make list so we can loop
if isinstance(plot_event, str):
plot_event = [plot_event]
else:
if isinstance(plot_event, int):
plot_event = [col_names[plot_event]]
signals = [data[:, best_vox]]
labels = ['True signal']
for ev in plot_event:
ev_preds, _, _, _ = get_event_prediction(
ev=ev,
X=X_conv,
betas=betas_conv,
ev_names=col_names,
)
signals.append(ev_preds[:, best_vox])
labels.append(f"Event '{ev}'")
markers.append(None)
linewidth.append(2)
colors = [*colors, *sns.color_palette(cmap, len(plot_event))]
else:
plot_full = True
# append full model
if plot_full:
signals.append(preds[:, best_vox])
labels.append("full model")
markers.append(None)
linewidth.append(1)
colors.append("k")
if "title" not in list(kwargs.keys()):
kwargs = utils.update_kwargs(
kwargs,
"title",
f"model fit vox {best_vox+1}/{data.shape[1]} (r2={round(r2[best_vox],4)})",
)
_ = plotting.LazyLine(
signals,
y_label="Activity (a.u.)",
x_label="volumes",
labels=labels,
axs=axs,
markers=markers,
color=colors,
line_width=linewidth,
**kwargs)
return {
'betas': betas_conv,
'x_conv': X_conv,
'tstats': tstat,
'r2': r2,
'copes': C,
'dm': X_matrix
}
[docs]
def calculate_r2(
data,
sse
):
tse = (data.shape[0] - 1) * np.var(data, axis=0, ddof=1)
r2 = 1 - (sse / tse)
return r2
[docs]
def calculate_tstats(
dm=None,
C=None,
betas=None,
sse=None,
rank=None
):
tstat = []
for co_ix in range(C.shape[0]):
# contrast-specific vector
cope_c = C[co_ix, :]
# calculate some other relevant parameters
cope = cope_c @ betas
dof = dm.shape[0] - rank
sigma_hat = sse / dof
varcope = sigma_hat * design_variance(dm, which_predictor=cope_c)
# calculate t-stats
t_ = cope / np.sqrt(varcope)
tstat.append(t_)
if len(tstat) > 0:
tstat = np.array(tstat)
return tstat
[docs]
def design_variance(X, which_predictor=1):
''' Returns the design variance of a predictor (or contrast) in X.
Parameters
----------
X : numpy array
Array of shape (N, P)
which_predictor : int or list/array
The index of the predictor you want the design var from.
Note that 0 refers to the intercept!
Alternatively, "which_predictor" can be a contrast-vector
(which will be discussed later this lab).
Returns
-------
des_var : float
Design variance of the specified predictor/contrast from X.
'''
is_single = isinstance(which_predictor, int)
if is_single:
idx = which_predictor
else:
idx = np.array(which_predictor) != 0
c = np.zeros(X.shape[1])
c[idx] = 1 if is_single == 1 else which_predictor[idx]
des_var = c.dot(np.linalg.inv(X.T.dot(X))).dot(c.T)
return des_var
[docs]
def double_gamma(x, lag=6, a2=12, b1=0.9, b2=0.9, c=0.35, scale=True):
"""double_gamma
Create a double gamma hemodynamic response function (HRF).
Parameters
----------
x: numpy.ndarray
timepoints along the HRF
lag: int, optional
duration until peak of HRF is reached, by default 6
a2: int, optional
second determinant of the HRF drop, by default 12
b1: float, optional
first determinant of HRF rise, by default 0.9
b2: float, optional
second determinant of HRF rise, by default 0.9
c: float, optional
constant for HRF drop, by default 0.35
scale: bool, optional
normalize course of HRF, by default True
Returns
----------
numpy.ndarray
HRF across given timepoints with shape (,`x.shape[0]`)
Example
----------
.. code-block:: python
dt = 1
time_points = np.linspace(0,36,np.rint(float(36)/dt).astype(int))
hrf_custom = lazyfmri.glm.double_gamma(time_points, lag=6)
hrf_custom = hrf_custom[np.newaxis,...]
"""
a1 = lag
d1 = a1 * b1
d2 = a2 * b2
hrf = np.array([(t / (d1))**a1 * np.exp(-(t - d1) / b1) - c *
(t / (d2))**a2 * np.exp(-(t - d2) / b2) for t in x])
if scale:
hrf = (1 - hrf.min()) * (hrf - hrf.min()) / \
(hrf.max() - hrf.min()) + hrf.min()
return hrf
[docs]
class Posthoc(plotting.Defaults):
"""Posthoc
Initializes the `Posthoc` class for conducting posthoc statistical tests following an ANOVA or ANCOVA analysis.
This class is designed for simple pairwise comparisons using `pingouin.pairwise_tukey()` or `pingouin.pairwise_tests()`.
It also provides visualization support for significance bars on a given matplotlib axis.
Parameters
----------
**kwargs : dict
Additional parameters that will be passed to :class:`lazyfmri.plotting.Defaults()`, allowing customization of plotting settings.
Example
----------
.. code-block:: python
from lazyfmri import glm
posth = glm.Posthoc()
posth.run_posthoc(
data=df,
dv="dependent_variable",
between="grouping_variable"
)
posth.plot_bars(axs=axs)
"""
def __init__(self, **kwargs):
# initialize plotting setting
super().__init__(**kwargs)
[docs]
def sort_posthoc(self, df):
"""sort_posthoc
Sorts the output of posthoc tests based on the distance between compared conditions. The function ensures that
the longest significance bar spans the largest distance in the plot.
Parameters
----------
df : pd.DataFrame
Dataframe containing posthoc test results, with columns `"A"` and `"B"` indicating the conditions being compared.
Returns
-------
pd.DataFrame
The sorted dataframe with an additional `"distances"` column indicating the distance between compared conditions.
Example
----------
.. code-block:: python
sorted_df = posth.sort_posthoc(posth.posthoc)
print(sorted_df)
"""
distances = []
for contr in range(df.shape[0]):
A = df["A"].iloc[contr]
B = df["B"].iloc[contr]
x1 = self.conditions.index(A)
x2 = self.conditions.index(B)
distances.append(abs(x2 - x1))
df["distances"] = distances
return df.sort_values("distances", ascending=False)
[docs]
def run_posthoc(
self,
test: str = "tukey",
ano: dict = None,
paired=False,
*args,
**kwargs):
"""run_posthoc
Runs the posthoc test. By default, a Tukey test is executed (`"tukey"`), but other tests from `pingouin.pairwise_tests()`
can be used. The function supports both parametric and non-parametric comparisons.
Parameters
----------
test : str, optional
Type of posthoc test to execute. Default is `"tukey"`. If another value is provided, `pingouin.pairwise_tests()` is used.
ano : dict, optional
Dictionary containing ANOVA results. If provided, posthoc p-values can be inherited from the ANOVA output.
paired : bool, optional
If `True`, assumes a paired comparison (e.g., within-subject analysis). Default is `False`.
*args : tuple
Additional positional arguments for the `pingouin` posthoc functions.
**kwargs : dict
Additional keyword arguments for the `pingouin` posthoc functions.
Raises
------
ImportError
If `pingouin` is not installed.
Example
----------
.. code-block:: python
posth.run_posthoc(
data=df,
dv="response",
between="condition",
test="tukey"
)
print(posth.posthoc) # Print posthoc test results
"""
self.test = test
# internalize data
if "data" in list(kwargs.keys()):
self.data = kwargs["data"]
try:
import pingouin as pg
except BaseException:
raise ImportError("Could not import 'pingouin'")
# FDR-corrected post hocs
nonparam = False
kwargs["parametric"] = True
if "within" in list(kwargs.keys()):
nonparam = True
kwargs["parametric"] = False
if paired:
if "between" in list(kwargs.keys()):
kwargs["within"] = kwargs["between"]
kwargs.pop("between")
if paired or nonparam:
self.test = "test"
if self.test == "tukey":
self.posthoc = pg.pairwise_tukey(
*args,
**kwargs
)
self.p_tag = "p-tukey"
else:
self.posthoc = pg.pairwise_tests(
*args,
**kwargs)
if "p-corr" in list(self.posthoc.columns):
self.p_tag = "p-corr"
else:
self.p_tag = "p-unc"
tagger = "between"
if paired or nonparam:
tagger = "within"
if self.test == "inherit":
if len(ano) > 0:
if kwargs[tagger] in list(ano.keys()):
self.posthoc[self.p_tag] = ano[kwargs[tagger]]
# internalize all kwargs
self.__dict__.update(kwargs)
self.conditions = utils.get_unique_ids(
self.data, id=getattr(self, tagger), sort=False)
[docs]
def plot_bars(
self,
axs: mpl.axes._axes.Axes = None,
alpha: float = 0.05,
y_pos: float = 0.95,
line_separate_factor: float = -0.065,
ast_frac: float = 0.2,
ns_annot: bool = False,
ns_frac: bool = 5,
leg_size: float = 0.02,
color: str = "black",
*args,
**kwargs):
"""plot_bars
Plots significance bars on a matplotlib axis based on sorted posthoc results. The function determines significance
levels using asterisks (`"*"` for p < 0.05, `"**"` for p < 0.01, `"***"` for p < 0.001) and positions bars accordingly.
Parameters
----------
axs : mpl.axes._axes.Axes, optional
Axis to plot significance bars on. Default is `None`.
alpha : float, optional
Significance threshold. Default is `0.05`.
y_pos : float, optional
Starting position of the top significance line in axis proportions (1 = top of plot). Default is `0.95`.
This factor is reduced incrementally using `line_separate_factor`.
line_separate_factor : float, optional
Factor by which subsequent significance bars are shifted downward. Default is `-0.065`.
ast_frac : float, optional
Distance between significance line and annotation (e.g., asterisks or `"ns"` for non-significance). Default is `0.2`.
ns_annot : bool, optional
If `True`, non-significant contrasts are annotated with `"ns"`. Default is `False`.
ns_frac : float, optional
Additional factor to scale the distance between significance lines and `"ns"` annotations. Default is `5`.
leg_size : float, optional
Size of the overhang from the significance bars, defined as a fraction of the total y-axis limit. Default is `0.02`.
color : str, optional
Color of significance bars. Default is `"black"`.
*args : tuple
Additional arguments.
**kwargs : dict
Additional keyword arguments.
Example
----------
.. code-block:: python
fig, ax = plt.subplots()
posth.plot_bars(axs=ax, alpha=0.05, y_pos=1.1, color="red")
"""
# internalize args
self.axs = axs
self.alpha = alpha
self.y_pos = y_pos
self.line_separate_factor = line_separate_factor
self.ast_frac = ast_frac
self.ns_annot = ns_annot
self.ns_frac = ns_frac
self.leg_size = leg_size
self.color = color
# run posthoc if not present
if not hasattr(self, "posthoc"):
self.run_posthoc(*args, **kwargs)
# internalize all kwargs
self.__dict__.update(kwargs)
self.minmax = list(self.axs.get_ylim())
# sort posthoc so that bars furthest away are on top (if significant)
self.posthoc_sorted = self.sort_posthoc(self.posthoc)
if self.p_tag in list(self.posthoc_sorted.columns):
p_meth = self.p_tag
else:
p_meth = "p-unc"
for contr in range(self.posthoc_sorted.shape[0]):
txt = None
p_val = self.posthoc_sorted[p_meth].iloc[contr]
if p_val < self.alpha:
if 0.01 < p_val < 0.05:
txt = "*"
elif 0.001 < p_val < 0.01:
txt = "**"
elif p_val < 0.001:
txt = "***"
style = None
f_size = self.font_size
dist = self.ast_frac
else:
if self.ns_annot:
txt = "ns"
style = "italic"
f_size = self.label_size
dist = self.ast_frac * self.ns_frac
# read indices from output dataframe and conditions
if isinstance(txt, str):
A = self.posthoc_sorted["A"].iloc[contr]
B = self.posthoc_sorted["B"].iloc[contr]
x1 = self.conditions.index(A)
x2 = self.conditions.index(B)
diff = self.minmax[1] - self.minmax[0]
y, h, col = (diff * self.y_pos) + \
self.minmax[0], diff * self.leg_size, self.color
self.axs.plot(
[x1, x1, x2, x2],
[y, y + h, y + h, y],
lw=self.tick_width,
c=col
)
x_txt = (x1 + x2) * .5
y_txt = y + (y * dist)
# print(y,h)
self.axs.text(
x_txt,
y_txt,
txt,
ha='center',
va='bottom',
color=col,
fontname=self.fontname,
fontsize=f_size,
style=style
)
# make subsequent bar lower than first
self.y_pos += self.line_separate_factor
# reset txt
txt = None
[docs]
class ANOVA(Posthoc):
"""ANOVA
Runs an ANOVA using `pingouin` and subsequently performs posthoc tests. This class allows for immediate visualization
of significant results using a Matplotlib axis. Unlike :class:`lazyfmri.glm.Posthoc`, arguments for the ANOVA test
are provided at initialization. If `covar` is specified, an ANCOVA is run instead of an ANOVA.
Parameters
----------
alpha : float, optional
Alpha value to determine statistical significance. Default is `0.05`.
axs : mpl.axes._axes.Axes, optional
Matplotlib axis on which to plot significance bars. Default is `None`.
posthoc_kw : dict, optional
Dictionary of arguments passed to :func:`lazyfmri.glm.Posthoc.plot_bars()`. Default is `{}`.
plot_kw : dict, optional
Additional keyword arguments for customizing the plot. Default is `{}`.
bar_kw : dict, optional
Additional keyword arguments for customizing the significance bars. Default is `{}`.
*args : tuple
Additional positional arguments for `pingouin.anova()` or `pingouin.ancova()`.
**kwargs : dict
Additional keyword arguments for `pingouin.anova()` or `pingouin.ancova()`.
Example
-------
.. code-block:: python
from lazyfmri import glm
# Run an ANOVA on a dataset
aov = glm.ANOVA(
data=df,
dv="dependent_variable",
between="grouping_variable",
posthoc_kw={
"effsize": "cohen",
"test": "t-test",
"paired": True,
"subject": "vox",
"padjust": "holm"
}
)
# If using LazyBar, add significance bars
aov.plot_bars(
axs=bar.axs,
ast_frac=0,
y_pos=1.15,
line_separate_factor=-0.075
)
"""
def __init__(
self,
alpha: float = 0.05,
axs: mpl.axes._axes.Axes = None,
posthoc_kw: dict = {},
plot_kw={},
bar_kw={},
*args,
**kwargs):
self.posthoc_kw = posthoc_kw
self.bar_kw = bar_kw
self.plot_kw = plot_kw
self.alpha = alpha
self.run_anova(
alpha=self.alpha,
posthoc_kw=self.posthoc_kw,
bar_kw=self.bar_kw,
axs=axs,
plot_kw=self.plot_kw,
*args,
**kwargs
)
def _get_results(
self,
df: pd.DataFrame,
alpha: float = 0.05):
"""Extract significant p-values from an ANOVA table.
Identifies significant effects from the ANOVA results dataframe by checking if p-values
are below the defined `alpha` threshold.
Parameters
----------
df : pd.DataFrame
Dataframe containing ANOVA results with at least a `"Source"` and `"p-unc"` column.
alpha : float, optional
Significance threshold for identifying significant effects. Default is `0.05`.
Returns
-------
dict
Dictionary where keys are significant effects and values are their respective p-values.
Example
-------
.. code-block:: python
results = aov._get_results(aov.ano, alpha=0.05)
print(results) # {'Group': 0.002, 'Condition': 0.035}
"""
effects = df["Source"].to_list()
p_vals = {}
for ef in effects:
p_ = df.loc[(df["Source"] == ef)]["p-unc"].values[0]
if p_ < alpha:
p_vals[ef] = p_
return p_vals
[docs]
def run_anova(
self,
alpha: float = 0.05,
axs: mpl.axes._axes.Axes = None,
parametric="auto",
posthoc_kw=None,
plot_kw=None,
bar_kw=None,
*args,
**kwargs):
"""Runs an ANOVA or ANCOVA analysis.
Uses the `pingouin` package to perform an ANOVA, automatically choosing between parametric (ANOVA/ANCOVA)
and non-parametric (Friedman test) based on normality testing.
Parameters
----------
alpha : float, optional
Significance level for statistical tests. Default is `0.05`.
axs : mpl.axes._axes.Axes, optional
Matplotlib axis on which to plot significance bars. Default is `None`.
parametric : str or bool, optional
Determines whether to run a parametric or non-parametric test:
- `"auto"` (default): Tests normality and selects automatically.
- `True`: Runs a parametric ANOVA.
- `False`: Runs a non-parametric Friedman test.
posthoc_kw : dict, optional
Dictionary of arguments for posthoc testing. Default is `{}`.
plot_kw : dict, optional
Additional parameters for customizing plots. Default is `{}`.
bar_kw : dict, optional
Parameters for significance bars. Default is `{}`.
args : tuple
Additional positional arguments for `pingouin.anova()` or `pingouin.ancova()`.
kwargs : dict
Additional keyword arguments for `pingouin.anova()` or `pingouin.ancova()`.
Raises
------
ImportError
If `pingouin` is not installed.
Example
-------
.. code-block:: python
aov.run_anova(
data=df,
dv="response",
between="condition",
posthoc_kw={"effsize": "cohen", "padjust": "holm"}
)
print(aov.ano) # Print ANOVA table
"""
self.alpha = alpha
try:
import pingouin as pg
except BaseException:
raise ImportError("Could not import 'pingouin'")
# do stats
if parametric == "auto":
data, dv = kwargs["data"], kwargs["dv"]
try:
data.reset_index(inplace=True)
except BaseException:
pass
# test if all groups are equal
if "between" in list(kwargs.keys()):
between = kwargs['between']
self.norm_test = pg.normality(data, dv=dv, group=between)
parametric = all(self.norm_test["normal"].values)
else:
self.norm_test = pg.normality(data, dv=dv)
parametric = self.norm_test.iloc[0]
if parametric:
if "covar" in list(kwargs.keys()):
ffunc = pg.ancova
else:
ffunc = pg.anova
self.ano = ffunc(
*args,
**kwargs
)
else:
self.ano = pg.friedman(
*args,
**kwargs
)
# check if there's signifcant results
self.results = self._get_results(self.ano, alpha=self.alpha)
# found sig results; do posthoc
filter_kwargs = [
"covar",
]
for i in filter_kwargs:
if i in list(kwargs.keys()):
kwargs.pop(i)
# found sig results; do posthoc
self.ph_obj = {}
super().__init__(**plot_kw)
self.run_posthoc(ano=self.results, **kwargs, **posthoc_kw)
if isinstance(axs, mpl.axes._axes.Axes):
self.plot_bars(
axs=axs,
**bar_kw
)