Source code for lazyfmri.dataset

import os
import json
import pickle
import numpy as np
import pandas as pd
import nibabel as nb
from scipy import io
import matplotlib.pyplot as plt
from . import glm, plotting, preproc, utils

try:
    import hedfpy
    HEDFPY_AVAILABLE = True
except ImportError:
    HEDFPY_AVAILABLE = False

opj = os.path.join
# disable warning thrown by string2float
pd.options.mode.chained_assignment = None

rb = f"{utils.color.RED}{utils.color.BOLD}"
gb = f"{utils.color.GREEN}{utils.color.BOLD}"
bm = f"{utils.color.MAGENTA}{utils.color.BOLD}"
end = utils.color.END

[docs] def filter_kwargs(ignore_kwargs, kwargs): if isinstance(ignore_kwargs, str): ignore_kwargs = [ignore_kwargs] tmp_kwargs = {} for ii in kwargs: if ii not in ignore_kwargs: tmp_kwargs[ii] = kwargs[ii] return tmp_kwargs
[docs] def check_input_is_list(obj, var=None, list_element=0, matcher="func_file"): if hasattr(obj, var): attr = getattr(obj, var) else: raise ValueError(f"Class does not have '{var}'-attribute") if isinstance(attr, (list, np.ndarray)): if len(attr) != len(getattr(obj, matcher)): raise ValueError( f""" Length of '{var}' ({len(attr)}) does not match number of func files ({len(getattr(obj,matcher))}). Either specify a list of equal lenghts or 1 integer value for all volumes""") return attr[list_element] else: return attr
[docs] class SetAttributes(): def __init__(self): # store ParseEyetracker attributes self.eye_attributes = [ "df_blinks", "df_space_func", "df_space_eye", "df_saccades" ] # store ParseExptoolsFile attributes self.exp_attributes = [ "df_onsets", "df_rts", "df_accuracy", "df_responses", ] # store ParseFuncFile attributes self.func_attributes = [ "df_func_psc", "df_func_raw", "df_func_zscore", "df_func_ica", ] # combine them all for Dataset-class self.all_attributes = self.eye_attributes + \ self.exp_attributes+self.func_attributes
[docs] class ParseEyetrackerFile(SetAttributes): """ParseEyetrackerFile Class for parsing edf-files created during experiments with Exptools2. The class will read in the file, read when the experiment actually started, correct onset times for this start time and time deleted because of removing the first few volumes (to do this correctly, set the `TR` and `deleted_first_timepoints`). You can also provide a numpy array/file containing eye blinks that should be added to the onset times in real-world time (seconds). In principle, it will return a pandas DataFrame indexed by subject and run that can be easily concatenated over runs. This function relies on the naming used when programming the experiment. In the `session.py` file, you should have created ``phase_names=['iti', 'stim']``; the class will use these things to parse the file. Parameters ---------- edf_file: str, list path pointing to the output file of the experiment; can be a list of multiple. Ideally, all these files belong to 1 subject, otherwise it tries to write everything to 1 file, which is too much subject: int subject number in the returned pandas DataFrame (should start with 1, ..., n) run: int run number you'd like to have the onset times for low_pass_pupil_f: float, optional Low-pass cutoff frequency high_pass_pupil_f: float, optional High-pass cutoff frequency TR: float, optional (fMRI) Repetition time of experiment. Together with `nr_vols`, used to determine the period that needs to be extracted after onset of the first trial. Default = None nr_vols: int, optional (fMRI) Together with `TR`, used to determine the period that needs to be extracted after onset of the first trial. Default = None deleted_first_timepoints: int number of volumes to delete to correct onset times for deleted volumes h5_file: str, optional Custom path to h5-file in which to store the complete output from `edf_file`. If nothing's specified, it'll output an `eye.h5`-file in the directory of the first edf-file in the list. Examples ---------- .. code-block:: python from lazyfmri.dataset import ParseExpToolsFile file = 'some/path/to/exptoolsfile.tsv' parsed_file = ParseExpToolsFile( file, subject=1, run=1, button=True ) onsets = parsed_file.get_onset_df() Notes ---------- If you have self-paced experiments or want to extract the full data from the eyetracker, keep `TR` and `nr_vols` **None**. """ def __init__( self, edf_file, eye_h5=None, subject=1, run=1, task=None, low_pass_pupil_f=6.0, high_pass_pupil_f=0.01, func_file=None, TR=0.105, verbose=False, use_bids=True, nr_vols=None, save_as=None, invoked_from_func=False, overwrite=False, **kwargs ): super().__init__() if not HEDFPY_AVAILABLE: raise ModuleNotFoundError( "could not find 'hedfpy', so this functionality is disabled") self.edf_file = edf_file self.func_file = func_file self.sub = subject self.run = run self.task = task self.TR = TR self.low_pass_pupil_f = low_pass_pupil_f self.high_pass_pupil_f = high_pass_pupil_f self.verbose = verbose self.use_bids = use_bids self.nr_vols = nr_vols self.save_as = save_as self.invoked_from_func = invoked_from_func self.overwrite = overwrite self.eye_h5 = eye_h5 self.__dict__.update(kwargs) # deal with edf-files self.edf_file = self.check_input(self.edf_file) self.func_file = self.check_input(self.func_file) # add all files to h5-file if isinstance(self.edf_file, (str, list)): # print message utils.verbose("\nEYETRACKER", self.verbose) # preprocess and fetch dataframes self.set_indices() self.define_hdf_file() self.write_edf_to_hdf() self.preprocess_edf_files() self.ho.close_hdf_file()
[docs] def set_indices(self): # check if we should index task self.index_task = False self.blink_index = ['subject', 'run', 'event_type'] self.eye_index = ['subject', 'run', 'eye', 't'] self.sac_index = ['subject', 'run', 'event_type'] if self.use_bids: self.task_ids = utils.get_ids(self.edf_file, bids="task") # insert task id in indexer if len(self.task_ids) > 1: self.index_task = True for idx in ["blink_index", "eye_index", "sac_index"]: getattr(self, idx).insert(1, "task")
[docs] def check_input(self, in_files): # deal with files if in_files is not None: if isinstance(in_files, str): return [os.path.abspath(in_files)] elif isinstance(in_files, list): return [os.path.abspath(i) for i in in_files] else: raise ValueError( f"Input must be 'str' or 'list', not '{type(in_files)}'")
[docs] def define_hdf_file(self): # get basename self.get_bids_info(self.edf_file[0], 0) self.base_name = self.get_base_name( subID=self.sub, sesID=self.ses ) # check if there's an instance of h5_file if not isinstance(self.eye_h5, str): if self.save_as is not None: self.eye_h5 = opj(self.save_as, f"{self.base_name}_desc-preproc_eye.h5") else: self.eye_h5 = opj(os.path.dirname(self.edf_file[0]), "eye.h5") # check if we should overwrite if self.overwrite: if os.path.exists(self.eye_h5): store = pd.HDFStore(self.eye_h5) store.close() os.remove(self.eye_h5) self.ho = hedfpy.HDFEyeOperator(self.eye_h5)
[docs] def write_edf_to_hdf(self): if not os.path.exists(self.eye_h5): for i, edf_file in enumerate(self.edf_file): if not os.path.exists(edf_file): raise FileNotFoundError( f"Could not read specified file: '{edf_file}'") self.run = i+1 self.sub = 1 if self.use_bids: bids_comps = utils.split_bids_components(edf_file) for el in ['sub', 'run', 'task']: if el in list(bids_comps.keys()): setattr(self, el, bids_comps[el]) # set alias alias = f"run_{self.run}" if isinstance(self.task, str): alias = f"task_{self.task}_run_{self.run}" self.ho.add_edf_file(edf_file) self.ho.edf_message_data_to_hdf(alias=alias) self.ho.edf_gaze_data_to_hdf( alias=alias, pupil_hp=self.high_pass_pupil_f, pupil_lp=self.low_pass_pupil_f ) else: self.ho.open_hdf_file() # clean up hedfpy-files for ext in [".pdf", ".gaz", ".msg", ".gaz.gz", ".asc"]: utils.remove_files( os.path.dirname(self.eye_h5), ext, ext=True )
[docs] def get_base_name( self, subID=None, sesID=None, taskID=None ): base_name = "" # set base name based on presence of bids tags for key, val in zip(["sub", "ses", "task"], [subID, sesID, taskID]): if isinstance(val, (str, float, int)): if key == "sub": base_name = f"{key}-{val}" else: base_name += f"_{key}-{val}" return base_name
[docs] def get_tr(self, run=None): if self.TR is not None: use_TR = check_input_is_list( self, "TR", list_element=run, matcher="edf_file" ) else: use_TR = None return use_TR
[docs] def get_vols(self, i): nr_vols = None if isinstance(self.func_file, list): nr_vols = self.vols(self.func_file[i]) return nr_vols
[docs] def get_bids_info(self, file, i): self.run = i+1 self.ses = None self.task = None if self.use_bids: bids_comps = utils.split_bids_components(file) for el in ['sub', 'run', 'ses', 'task']: if el in list(bids_comps.keys()): setattr(self, el, bids_comps[el])
[docs] def fetch_extracted_data( self, run, task, TR, nr_vols ): alias = f"run_{run}" if isinstance(task, str): alias = f"task_{task}_run_{run}" fetch_data = True try: self.data = self.fetch_relevant_info( TR=TR, task=task, nr_vols=nr_vols, alias=alias ) except Exception: fetch_data = False # collect outputs if all went well if fetch_data: self.df_blinks.append(self.fetch_eyeblinks()) self.df_space_func.append(self.fetch_eye_func_time()) self.df_space_eye.append(self.fetch_eye_tracker_time()) self.df_saccades.append(self.fetch_saccades())
[docs] def concat_dataframes(self): for ii, ll in zip( ["df_blinks", "df_space_eye", "df_saccades"], [self.blink_index, self.eye_index, self.sac_index]): if len(getattr(self, ii)) > 0: setattr(self, ii, pd.concat(getattr(self, ii)).set_index(ll)) if len(self.df_space_func) > 0: # check if all elements are dataframes if all(isinstance(x, pd.DataFrame) for x in self.df_space_func): self.df_space_func = pd.concat( self.df_space_func).set_index(self.eye_index)
[docs] def preprocess_edf_files(self): # set them for internal reference for attr in self.eye_attributes: setattr(self, attr, []) for i, edf_file in enumerate(self.edf_file): utils.verbose(f"Preprocessing {gb}{edf_file}{end}", self.verbose) self.get_bids_info(edf_file, i) # get basename self.base_name = self.get_base_name( subID=self.sub, sesID=self.ses, taskID=self.task ) # check if we got multiple TRs for different edf-files use_TR = self.get_tr(run=self.run) # check volumes nr_vols = self.get_vols(i) # fetch relevant data self.fetch_extracted_data( self.run, self.task, use_TR, nr_vols ) # concatenate available dataframes self.concat_dataframes()
[docs] def fetch_saccades(self): return self.data['saccades']
[docs] def fetch_eye_func_time(self): return self.data['space_func']
[docs] def fetch_eye_tracker_time(self): return self.data['space_eye']
[docs] def fetch_relevant_info( self, task=None, nr_vols=None, alias=None, TR=None, save_as=None ): # load times per session: trial_times = self.ho.read_session_data(alias, 'trials') # fetch duration of scan or read until end of edf file use_end_point = True if TR is not None and nr_vols is not None: func_time = nr_vols*TR use_end_point = False # get block parameters self.session_start_EL_time = trial_times.iloc[0, 0] self.sample_rate = self.ho.sample_rate_during_period(alias) # add number of fMRI*samplerate as stop EL time or read until end of edf file if not use_end_point: self.session_stop_EL_time = self.session_start_EL_time + \ (func_time*self.sample_rate) else: self.session_stop_EL_time = self.ho.block_properties( alias)["block_end_timestamp"].iloc[0] # define period self.time_period = [self.session_start_EL_time, self.session_stop_EL_time] eye = self.ho.eye_during_period(self.time_period, alias) utils.verbose(f" Eye: {rb}{eye}{end}", self.verbose) utils.verbose(f" Sample rate: {rb}{self.sample_rate}{end}", self.verbose) utils.verbose(f" Start time: {rb}{self.time_period[0]}{end}", self.verbose) utils.verbose(f" Stop time: {rb}{self.time_period[1]}{end}", self.verbose) # set some stuff required for successful plotting with seconds on the x-axis n_samples = int(self.time_period[1]-self.time_period[0]) duration_sec = n_samples*(1/self.sample_rate) utils.verbose( f" Duration: {rb}{duration_sec}{end}s [{rb}{n_samples}{end} samples]", self.verbose) # Fetch a bunch of data extract = [ "pupil", "pupil_int", "gaze_x_int", "gaze_y_int", "gaze_x", "gaze_y"] utils.verbose(f" Fetching: [{extract}]", self.verbose) tf = [] tf_rs = [] df_space_func = None for par_ix, extr in enumerate(extract): data = np.squeeze( self.ho.signal_during_period( time_period=self.time_period, alias=alias, signal=extr, requested_eye=eye ).values ) if data.ndim < 2: data = data[..., np.newaxis] tmp = [] tmp_rs = [] for ix, ii in enumerate(list(eye)): if isinstance(nr_vols, int): rs = glm.resample_stim_vector(data[:, ix], nr_vols) hp = preproc.highpass_dct( rs, self.high_pass_pupil_f, TR=TR )[0] psc = np.squeeze( utils.percent_change( rs, 0, baseline=hp.shape[0] ) ) psc_hp = np.squeeze( utils.percent_change( hp, 0, baseline=hp.shape[0] ) ) tmp_rs_df = pd.DataFrame({ f"{extr}": rs, f"{extr}_psc": psc, f"{extr}_hp": hp, f"{extr}_hp_psc": psc_hp} ) if par_ix == len(extr)-1: tmp_rs_df["eye"] = ii tmp_rs_df["t"] = list( (1/self.sample_rate)*np.arange(rs.shape[0])) tmp_rs.append(tmp_rs_df) psc = np.squeeze( utils.percent_change( data[:, ix], 0, baseline=data[:, ix].shape[0] ) ) tmp_df = pd.DataFrame({ f"{extr}": data[:, ix], f"{extr}_psc": psc}) if par_ix == len(extr)-1: tmp_df["eye"] = ii tmp_df["t"] = list((1/self.sample_rate) * np.arange(data.shape[0])) tmp.append(tmp_df) tmp = pd.concat(tmp) tf.append(tmp) if len(tmp_rs) > 0: tmp_df = pd.concat(tmp_rs) tf_rs.append(tmp_df) df_space_eye = pd.concat(tf, axis=1) # nothing to resample if nr_vols==None if len(tf_rs) > 0: df_space_func = pd.concat(tf_rs, axis=1) # add start time to it start_exp_time = trial_times.iloc[0, :][-1] utils.verbose(f" Start time: {rb}{round(start_exp_time, 2)}{end}s", self.verbose) # get onset time of blinks, cluster blinks that occur within 350 ms bb = self.ho.blinks_during_period(self.time_period, alias=alias) onsets = bb["start_timestamp"].values # convert the onsets to seconds in experiment onsets = (onsets-self.time_period[0])*(1/self.sample_rate) # normal eye blink is 1 blink every 4 seconds, throw warning if we found more than a blink per second # ref: https://www.sciencedirect.com/science/article/abs/pii/S0014483599906607 blink_rate = len(onsets)/duration_sec utils.verbose( f" Nr blinks: {rb}{len(onsets)}{end} [{rb}{round(blink_rate, 2)}{end} blinks per second]", self.verbose) # extract saccades df_saccades = self.ho.detect_saccades_during_period( time_period=self.time_period, alias=alias) if len(df_saccades) > 0: utils.verbose( f" Nr saccades: {rb}{len(df_saccades)}{end} saccades", self.verbose) # convert saccade onset to seconds relative to start of run df_saccades["onset"] = df_saccades["expanded_start_time"] / \ self.sample_rate # add some stuff for indexing for tt, mm in zip(["subject", "run", "event_type"], [self.sub, self.run, "saccade"]): df_saccades[tt] = mm # set task index df_saccades = self.set_task_index(df_saccades, task=task) # build dataframe with relevant information self.tmp_df = df_space_eye.copy() df_space_eye = pd.DataFrame(df_space_eye) # index df_space_eye['subject'], df_space_eye['run'] = self.sub, self.run # set task index df_space_eye = self.set_task_index(df_space_eye, task=task) # index if isinstance(df_space_func, pd.DataFrame): df_space_func['subject'], df_space_func['run'] = self.sub, self.run # set task index df_space_func = self.set_task_index(df_space_func, task=task) # index df_blink_events = pd.DataFrame(onsets, columns=['onset']) for tt, mm in zip(['subject', 'run', 'event_type'], [self.sub, self.run, "blink"]): df_blink_events[tt] = mm # set task index df_blink_events = self.set_task_index(df_blink_events, task=task) utils.verbose("Done", self.verbose) return_dict = { "space_eye": df_space_eye, "space_func": df_space_func, "blink_events": df_blink_events} if len(df_saccades) > 0: return_dict["saccades"] = df_saccades if self.report: # make some plots if save_as is None: save_as = opj(os.getcwd(), "figures") if not os.path.exists(save_as): os.makedirs(save_as, exist_ok=True) fname = opj(save_as, f"{self.base_name}_run-{self.run}_desc") self.plot_trace_and_heatmap( df_space_eye, fname=fname ) return return_dict
[docs] def set_task_index(self, df, task=None): # index task if required if self.index_task: if isinstance(task, str): df["task"] = task return df
[docs] def plot_trace_and_heatmap( self, df, fname=None, screen_size=(1920, 1080), scale="screen" ): if not isinstance(df, pd.DataFrame): raise TypeError(f"Input must be a pd.Dataframe, not {type(df)}") use_eyes = list(np.unique(df.reset_index()['eye'])) fig = plt.figure(figsize=(24, len(use_eyes)*6), constrained_layout=True) if len(use_eyes) > 1: y = 1.075 hspace = 0.12 else: y = 1.15 hspace = 0 sf = fig.subfigures(nrows=len(use_eyes), hspace=hspace) fig.suptitle('positional stability', fontsize=32, y=y) for ii, eye in enumerate(use_eyes): if len(use_eyes) > 1: sf_a = sf[ii] else: sf_a = sf axs = sf_a.subplots(ncols=2, gridspec_kw={ "width_ratios": [0.35, 0.65], "hspace": hspace}) eye_df = utils.select_from_df(df, expression=f"eye = {eye}") input_l = [eye_df[f"gaze_{i}_int"].values for i in ["x", "y"]] # x-coordinate of the fixation cross x_cor_fixcross = screen_size[0]/2 # y-coordinate of the fixation cross y_cor_fixcross = screen_size[1]/2 x_coor_relfix = (input_l[0] - x_cor_fixcross) y_coor_relfix = (input_l[1] - y_cor_fixcross) if isinstance(scale, str): if scale == "auto": ext = [x_coor_relfix.min(), x_coor_relfix.max(), y_coor_relfix.min(), y_coor_relfix.max()] elif scale == "screen": xlim = (-screen_size[0]//2, screen_size[0]//2) ylim = (-screen_size[1]//2, screen_size[1]//2) ext = [xlim[0], xlim[1], ylim[0], ylim[1]] else: raise ValueError( f"Unknown scale '{scale}' specified. Must be 'auto' or 'screen', or specify a tuple") axs[0].hexbin( x_coor_relfix, y_coor_relfix, gridsize=100, cmap="magma", extent=ext ) axs[0].axhline(y=0, color='white', linestyle='-', linewidth=0.5) axs[0].axvline(x=0, color='white', linestyle='-', linewidth=0.5) plotting.conform_ax_to_obj( ax=axs[0], y_label="y position", x_label="x position" ) avg = [float(input_l[i].mean()) for i in range(len(input_l))] std = [float(input_l[i].std()) for i in range(len(input_l))] plotting.LazyLine( input_l, line_width=2, axs=axs[1], color=["#1B9E77", "#D95F02"], labels=[f"gaze {i} (M={round(avg[ix],2)}; SD={round(std[ix],2)}px)" for ix, i in enumerate( ["x", "y"])], x_label="samples", y_label="position (pixels)", add_hline={"pos": avg}, ) y_pos = 1.05 sf_a.suptitle( f"eye-{eye}", fontsize=24, y=y_pos, fontweight="bold" ) # plt.tight_layout() if isinstance(fname, str): fname += f"-eye_qa.{self.save_ext}" utils.verbose(f" Writing {bm}{fname}{end}", self.verbose) fig.savefig( fname, bbox_inches='tight', dpi=300 )
[docs] def vols(self, func_file): if func_file.endswith("gz") or func_file.endswith('nii'): img = nb.load(func_file) nr_vols = img.get_fdata().shape[-1] self.TR2 = img.header['pixdim'][4] elif func_file.endswith("mat"): raw = io.loadmat(func_file) tag = list(raw.keys())[-1] raw = raw[tag] nr_vols = raw.shape[-1] else: raise ValueError( f"Could not derive number of volumes for file '{func_file}'") return nr_vols
[docs] class ParseExpToolsFile(ParseEyetrackerFile, SetAttributes): """ParseExpToolsFile() Class for parsing tsv-files created during experiments with Exptools2. The class will read in the file, read when the experiment actually started, correct onset times for this start time and time deleted because of removing the first few volumes (to do this correctly, set the `TR` and `deleted_first_timepoints`). You can also provide a numpy array/file containing eye blinks that should be added to the onset times in real-world time (seconds). In principle, it will return a pandas DataFrame indexed by subject and run that can be easily concatenated over runs. This function relies on the naming used when programming the experiment. In the `session.py` file, you should have created `phase_names=['iti', 'stim']`; the class will use these things to parse the file. Parameters ---------- tsv_file: str, list path pointing to the output file of the experiment subject: int subject number in the returned pandas DataFrame (should start with 1, ..., n) run: int run number you'd like to have the onset times for button: bool boolean whether to include onset times of button responses (default is false). ['space'] will be ignored as response TR: float repetition time to correct onset times for deleted volumes deleted_first_timepoints: int number of volumes to delete to correct onset times for deleted volumes. Can be specified for each individual run if `tsv_file` is a list use_bids: bool, optional If true, we'll read BIDS-components such as 'sub', 'run', 'task', etc from the input file and use those as indexers, rather than sequential 1,2,3. funcs: str, list, optional List of functional files that is being passed down down to :class:`lazyfmri.dataset.ParseEyetrackerFile`. Required for correct resampling to functional space edfs: str, list, optional List of eyetracking output files that is being passed down down to :class:`lazyfmri.dataset.ParseEyetrackerFile`. verbose: bool, optional Print details to the terminal, default is False phase_onset: int, optional Which phase of exptools-trial should be considered the actual stimulus trial. Usually, `phase_onset=0` means the interstimulus interval. Therefore, default = 1 stim_duration: str, int, optional If desired, add stimulus duration to onset dataframe. Can be one of 'None', 'stim' (to use duration from exptools' log file) or any given integer add_events: str, list, optional Add additional events to onset dataframe. Must be an existing column in the exptools log file. For intance, `responses` and `event_type = stim` are read in by default, but if we have a separate column containing the onset of some target (e.g., 'target_onset'), we can add these times to the dataframe with `add_events='target_onset'`. event_names: str, list, optional Custom names for manually added events through `add_events` if the column names are not the names you want to use in the dataframe. E.g., if I find `target_onset` too long of a name, I can specify `event_names='target'`. If `add_events` is a list, then `event_names` must be a list of equal length if custom names are desired. By default we'll take the names from `add_events` RTs: bool, optional If we have a design that required some response to a stimulus, we can request the reaction times. Default = False RT_relative_to: str, optional If `RTs=True`, we need to know relative to what time the button response should be offset. Only correct responses are considered, as there's a conditional statement that requires the present of the reference time (e.g., `target_onset`) and button response. If there's a response but no reference time, the reaction time cannot be calculated. If you do not have a separate reference time column, you can specify `RT_relative_to='start'` to calculate the reaction time relative to onset time. If `RT_relative_to != 'start'`, I'll assume you had a target in your experiment in X/n_trials. From this, we can calculate the accuracy and save that to `self.df_accuracy`, while reaction times are saved in `self.df_rts` button_duration: float, int, optional Set duration for button event response_window: float, int, optional Set window in which a response is counted merge: bool, optional Merge the dataframes containing responses and stimulus onsets. Default = True. Onset times will be sorted in an ascending order. Select parts of dataframes with :func:`lazyfmri.utils.select_from_df()` resp_as_cov: bool, optional If you have a design where you have button presses in half of the trials, you can add a covariate consisting of -1/1 for the stimulus onsets where there was a response or not. This way, the response event will not steal variance from the stimulus event when fitting using :class:`lazyfmri.fitting.NideconvFitter()`. Default = False ev_onset: str, optional Sometimes experiments are coded such that the event name of interest is not called `stim`. Use this variable + `phase_onset` to extract the correct onset times key_press: list, optional Specify a custom list of button presses to extract. Default = `['b']` expr: str, tuple, optional Add additional conditions for your extracted onset times. This follows the formulation of :func:`lazyfmri.utils.select_from_df()`. E.g., `expr="movie_type != blank`. Acts as an extra filter to extract the relevant onsets filter_na: bool, optional Try to filter out NaN-events from the onset dataframe Examples ---------- .. code-block:: python from lazyfmri.dataset import ParseExpToolsFile file = 'some/path/to/exptoolsfile.tsv' parsed_file = ParseExpToolsFile(file, subject=1, run=1, button=True) onsets = parsed_file.get_onset_df() """ def __init__( self, tsv_file, subject=1, run=1, task=None, button=False, RTs=False, RT_relative_to=None, TR=0.105, deleted_first_timepoints=0, edfs=None, funcs=None, use_bids=True, verbose=False, phase_onset=1, stim_duration=None, add_events=None, add_cols=None, event_names=None, invoked_from_func=False, button_duration=1, response_window=3, merge=True, resp_as_cov=False, cov_amplitude=1, ev_onset="stim", duration_col="duration", key_press=["b"], expr=None, filter_na=False, **kwargs ): self.tsv_file = tsv_file self.sub = subject self.run = run self.task = task self.TR = TR self.deleted_first_timepoints = deleted_first_timepoints self.button = button self.funcs = funcs self.edfs = edfs self.use_bids = use_bids self.verbose = verbose self.phase_onset = phase_onset self.stim_duration = stim_duration self.RTs = RTs self.RT_relative_to = RT_relative_to self.add_events = add_events self.event_names = event_names self.invoked_from_func = invoked_from_func self.button_duration = button_duration self.response_window = response_window self.merge = merge self.resp_as_cov = resp_as_cov self.key_press = key_press self.ev_onset = ev_onset self.expr = expr self.duration_col = duration_col self.add_cols = add_cols self.cov_amplitude = cov_amplitude self.filter_na = filter_na # filter kwargs tmp_kwargs = filter_kwargs( [ "ref_slice", "filter_strat", ], kwargs ) # set attributes SetAttributes.__init__(self) # process edf self.process_edf_file(**tmp_kwargs) # deal with exptools files self.process_exptools_files()
[docs] def process_edf_file(self, **kwargs): if self.edfs is not None or hasattr(self, "h5_file"): ParseEyetrackerFile.__init__( self, self.edfs, subject=self.sub, func_file=self.funcs, TR=self.TR, use_bids=self.use_bids, verbose=self.verbose, invoked_from_func=self.invoked_from_func, **kwargs )
[docs] def process_exptools_files(self): utils.verbose("\nEXPTOOLS", self.verbose) if isinstance(self.tsv_file, str): self.tsv_file = [self.tsv_file] if isinstance(self.tsv_file, list): self.onset_index = ['subject', 'run', 'event_type'] self._index = ['subject', 'run'] self.index_task = False self.task_ids = [] if self.use_bids: self.task_ids = utils.get_ids(self.tsv_file, bids="task") else: if isinstance(self.task, str): self.task_ids = [self.task] # insert task id in indexer if len(self.task_ids) > 0: self.index_task = True for idx in ["onset_index", "_index"]: getattr(self, idx).insert(1, "task") # set them for internal reference for attr in self.exp_attributes: setattr(self, attr, []) for run, onset_file in enumerate(self.tsv_file): self.run = run+1 self.ses = None if self.use_bids: bids_comps = utils.split_bids_components(onset_file) for el in ['sub', 'run', 'ses', 'task']: if el in list(bids_comps.keys()): setattr(self, el, bids_comps[el]) # check if we got different nr of vols to delete per run delete_vols = check_input_is_list( self, "deleted_first_timepoints", list_element=run, matcher="tsv_file") # check if we got different stimulus durations per run duration = check_input_is_list( self, var="stim_duration", list_element=run, matcher="tsv_file" ) # read in the exptools-file self.preprocess_exptools_file( onset_file, run=self.run, task=self.task, delete_vols=delete_vols, phase_onset=self.phase_onset, duration=duration ) # append to df self.df_onsets.append(self.get_onset_df(index=False)) # check if we got RTs try: self.df_rts.append(self.get_rts_df(index=False)) except Exception: pass # check if we got accuracy (only if RT_relative_to != 'start') try: self.df_accuracy.append(self.get_accuracy(index=False)) except Exception: pass # check if we got responses (only if button == False) try: self.df_responses.append(self.get_responses(index=False)) except Exception: pass # concatemate df self.df_onsets = pd.concat( self.df_onsets).set_index(self.onset_index) # filter for NaNs to be sure if self.filter_na: self.df_onsets = utils.select_from_df( self.df_onsets, expression="event_type != nan") # rts try: self.df_rts = pd.concat(self.df_rts).set_index(self._index) except Exception: pass # accuracy try: self.df_accuracy = pd.concat( self.df_accuracy).set_index(self._index) except Exception: pass # accuracy try: self.df_responses = pd.concat( self.df_responses).set_index(self.onset_index) except Exception: pass # get events per run self.evs_per_run = utils.get_unique_ids( self.df_onsets, id="event_type") # check if we should merge responses with onsets if self.merge: # keep pure onsets too self.df_onsets_pure = self.df_onsets.copy() self.concat_list = [self.df_onsets] # look for responses if len(self.df_responses) > 0: self.concat_list += [self.df_responses.copy()] # look for blinks if hasattr(self, "df_blinks"): self.concat_list += [self.df_blinks.copy()] # concatenate and sort if len(self.concat_list) > 0: self.merged = pd.concat(self.concat_list).sort_values( ["subject", "run", "onset"]) # set merged to new df_onsets self.df_onsets = self.merged.copy()
[docs] def events_per_run(self): n_runs = np.unique(self.df_onsets.reset_index()['run'].values) events = {} for run in n_runs: df = utils.select_from_df( self.df_onsets, expression=f"run = {run}", index=None) events[run] = np.unique(df['event_type'].values) return events
[docs] def events_single_run(self, run=1): return self.events_per_run[run]
[docs] def preprocess_exptools_file( self, tsv_file, task=None, run=1, delete_vols=0, phase_onset=1, duration=None): utils.verbose(f"Preprocessing {gb}{tsv_file}{end}", self.verbose) with open(tsv_file) as f: self.data = pd.read_csv(f, delimiter='\t') # trim onsets to first 't' delete_time = delete_vols*self.TR self.ev_types = utils.get_unique_ids(self.data, id="event_type") if "pulse" in self.ev_types: self.start_time = float(utils.select_from_df( self.data, expression="event_type = pulse").iloc[0].onset) else: self.start_time = 0 utils.verbose(f" 1st 't' @{rb}{round(self.start_time,2)}{end}s", self.verbose) # select all events after start time with `ev_onset` self.trimmed = utils.select_from_df( self.data, expression=( f"event_type = {self.ev_onset}", "&", f"onset > {self.start_time}" ) ) # check for more conditions if isinstance(self.expr, (tuple, str)): utils.verbose(f" Adding filters: {rb}{self.expr}{end}", self.verbose) self.trimmed = utils.select_from_df( self.trimmed, expression=self.expr) # get the correct phase self.trimmed = utils.select_from_df( self.trimmed, expression=f"phase = {phase_onset}" ) rm_cols = ["index", "level_0"] for col in rm_cols: if col in list(self.trimmed.columns): self.trimmed.drop([col], axis=1, inplace=True) # get onset times self.onset_times = self.trimmed['onset'].values[..., np.newaxis] self.n_trials = np.unique(self.trimmed["onset"].values).shape[0] skip_duration = False if isinstance(duration, (float, int)): self.durations = np.full_like(self.onset_times, float(duration)) elif duration is None: skip_duration = True else: self.durations = self.trimmed[self.duration_col].values[..., np.newaxis] try: self.condition = self.trimmed['condition'].values[..., np.newaxis] except Exception: self.condition = np.full_like( self.onset_times, self.ev_onset, dtype=object) if self.condition.ndim < 2: self.condition = self.condition[..., np.newaxis] # get dataframe with responses if "response" in np.unique(self.data["event_type"]): self.response_df = self.data.loc[(self.data['event_type'] == "response") & ( self.data['response'] != 'space')] if len(self.response_df) > 0: # filter out button responses before first trigger self.response_df = utils.select_from_df( self.response_df, expression=f"onset > {self.start_time}") self.nr_resp = self.response_df.shape[0] utils.verbose( f" Extracting {rb}{self.key_press}{end} button(s)", self.verbose) # loop through them self.button_df = [] self.single_button = True if len(self.key_press) > 1: self.single_button = False for button in self.key_press: # get the onset times self.response_times = self.response_df['onset'].values[..., np.newaxis] # store responses in separate dataframe self.response_times -= (self.start_time + delete_time) # decide name if self.single_button: ev_name = "response" else: ev_name = button # make into dataframe tmp = pd.DataFrame(self.response_times, columns=["onset"]) tmp["event_type"] = ev_name self.tmp_df = self.index_onset( tmp, task=task, subject=self.sub, run=run) self.button_df.append(self.tmp_df) if len(self.button_df) > 0: self.button_df = pd.concat(self.button_df) else: utils.verbose(" No button presses found", self.verbose) # check if we should include other events if isinstance(self.add_events, str): self.add_events = [self.add_events] if isinstance(self.event_names, str): self.event_names = [self.event_names] if isinstance(self.add_events, list): if isinstance(self.event_names, list): if len(self.event_names) != len(self.add_events): raise ValueError( f"""Length ({rb}{len(self.add_events)}{end}) of added events {rb}{self.add_events}{end} does not equal the length ({rb}{len(self.event_names)}{end}) of requested event names {rb}{self.event_names}{end}""") else: self.event_names = self.add_events.copy() for ix, ev in enumerate(self.add_events): ev_times = np.array( [ii for ii in np.unique(self.data[ev].values)]) # filter for nan (https://stackoverflow.com/a/11620982) ev_times = ev_times[~np.isnan(ev_times)][..., np.newaxis] # create condition ev_names = np.full(ev_times.shape, self.event_names[ix]) # add times and names to array self.onset_times = np.vstack((self.onset_times, ev_times)) self.condition = np.vstack((self.condition, ev_names)) # check if we should add duration (can't be used in combination with add_events) if not skip_duration: if isinstance(self.add_events, list): raise TypeError( f"""Cannot do this operation because I don't know the durations for the added events. Please consider using 'stim_duration!={self.stim_duration}' or 'add_events=None'""") self.onset = np.hstack( (self.onset_times, self.condition, self.durations)) else: self.onset = np.hstack((self.onset_times, self.condition)) if isinstance(self.add_cols, (str, list)): if isinstance(self.add_cols, str): self.add_cols = [self.add_cols] utils.verbose(f" Adding {rb}{self.add_cols}{end} to onsets", self.verbose) add_cols = [] for i in self.add_cols: add_cols.append(self.trimmed[i].values[..., np.newaxis]) add_cols = np.hstack(add_cols) self.onset = np.hstack([self.onset, add_cols]) # sort array based on onset times (https://stackoverflow.com/a/2828121) self.onset = self.onset[self.onset[:, 0].argsort()] # correct for start time of experiment and deleted time due to removal of inital volumes self.onset[:, 0] = self.onset[:, 0]-(self.start_time + delete_time) utils.verbose( f" Cutting {rb}{round(self.start_time + delete_time,2)}{end}s from onsets", self.verbose) if not skip_duration: utils.verbose( f" Avg duration = {rb}{round(self.durations.mean(),2)}{end}s", self.verbose) # make dataframe columns = ['onset', 'event_type'] if not skip_duration: columns += ['duration'] if isinstance(self.add_cols, list): columns += self.add_cols # check if we should do reaction times if self.RTs: if not isinstance(self.RT_relative_to, str): raise ValueError( f"Need a reference column to calculate reaction times (RTs), not '{self.RT_relative_to}'") # get response dataframe self.response_df = self.data.loc[(self.data['event_type'] == "response") & ( self.data['response'] != 'space')] # get target dataframe self.target_df = self.data.loc[~pd.isnull(self.data.target_onset)] # now we need to cross references them self.n_hits = 0 self.n_miss = 0 self.n_fa = 0 self.n_cr = 0 self.rts = [] self.unique_targets = np.unique(self.target_df["trial_nr"].values) self.n_targets = self.unique_targets.shape[0] self.n_responses = len( np.unique(self.response_df["trial_nr"].values)) for trial in self.unique_targets: # get target onset trial_targ = self.target_df.loc[( self.target_df["trial_nr"] == trial)] targ_onset = trial_targ["target_onset"].values[0] # check if there's a response within window regardless of whether response occured in trial ID resp = self.response_df.query( f"{targ_onset} < onset < {targ_onset+self.response_window}") if len(resp) > 0: # found response resp_time = resp["onset"].values[0] if resp_time > targ_onset: rt = resp_time-targ_onset self.rts.append(rt) self.n_hits += 1 if len(self.rts) >= 1: self.rts = np.array(self.rts) else: self.rts = np.array([0]) if self.n_hits > 0: self.hits = self.n_hits/self.n_targets self.n_miss = self.n_targets-self.n_hits else: self.n_miss = self.n_targets # track false alarms self.unique_responses = np.unique( self.response_df["trial_nr"].values) # track false alarms if self.n_responses > self.n_targets: self.n_fa = self.n_responses-self.n_targets else: self.n_fa = 0 # track correct rejections if self.n_fa > 0: self.n_cr = self.n_targets-self.n_fa else: self.n_cr = int(self.n_trials-self.n_targets) # calculate d-prime # d-prime=0 is considered as pure guessing. # d-prime=1 is considered as good measure of signal sensitivity/detectability. # d-prime=2 is considered as awesome. self.sdt_ = utils.SDT( self.n_hits, self.n_miss, self.n_fa, self.n_cr) if hasattr(self, 'sdt_'): utils.verbose( f" Hits:\t{round(self.sdt_['hit'],2)}\t({self.n_hits}/{self.n_targets})", self.verbose) utils.verbose( f" FA:\t{round(self.sdt_['fa'],2)}\t({self.n_fa}/{self.n_targets})", self.verbose) utils.verbose( f" D':\t{round(self.sdt_['d'],2)}\t(0=guessing;1=good;2=awesome)", self.verbose) utils.verbose( f" Average reaction time (RT) = {round(self.rts.mean(),2)}s (relative to '{self.RT_relative_to}').", self.verbose) # parse into dataframe self.accuracy_df = self.index_accuracy( self.sdt_, subject=self.sub, run=run) self.rts_df = self.array_to_df( self.rts, columns=["RTs"], subject=self.sub, run=run, key="RTs") # keep track of response during trial self.response_during_trial = np.full( (self.n_trials), -self.cov_amplitude) self.cov_times = [] for stim in range(self.n_trials): # get onset info self.trial_onset = self.trimmed.iloc[stim] # set default response time to stimulus onset append_time = self.trial_onset.onset # look if there's an actual response to stimulus trial = self.trial_onset.trial_nr if trial in self.unique_targets: trial_targ = self.target_df.loc[( self.target_df["trial_nr"] == trial)] targ_onset = trial_targ[self.RT_relative_to].values[0] # check if there's a response within window regardless of whether response occured in trial ID resp = self.response_df.query( f"{targ_onset} < onset < {targ_onset+self.response_window}") # response during trial if len(resp) > 0: resp_time = resp["onset"].values[0] self.response_during_trial[stim] = self.cov_amplitude append_time = resp_time self.cov_times.append(append_time) # for stimuli without responses, we'll create a dummy onset time consisting of the response time of the closest # button press DURING stimulus relative to the onset of stimulus for stim in range(self.n_trials): # we have -1 for stimuli without response time if self.response_during_trial[stim] < 0: # find closest trial with button press closest_trial = utils.find_nearest( self.response_during_trial[stim:], self.cov_amplitude)[0] closest_trial += stim # get reaction time of this stim trial_onset = self.trimmed.iloc[closest_trial].onset resp_onset = self.cov_times[closest_trial] rt = resp_onset-trial_onset # deal with button presses BEFORE stimulus onset if rt == 0: # invert array for last element closest_trial = utils.find_nearest( self.response_during_trial[::-1][-stim:], self.cov_amplitude)[0] closest_trial = (stim-closest_trial)-1 # get reaction time of this stim trial_onset = self.trimmed.iloc[closest_trial].onset resp_onset = self.cov_times[closest_trial] rt = resp_onset-trial_onset if rt == 0: raise ValueError("RT=0, something odd's going on here") # shift mock response onset with this RT # print(f"Adding {rt} to trial #{stim+1}") self.cov_times[stim] += rt # convert list to array and correct for start time and deleted volumes self.cov_times = np.array(self.cov_times) self.cov_times -= (self.start_time + delete_time) # create dataframe self.cov_df = pd.DataFrame( { "onset": self.cov_times, "cov": self.response_during_trial } ) for key, val in zip( ["event_type", "subject", "run"], ["response", self.sub, run]): self.cov_df[key] = val # inset onsets self.onset_df = self.index_onset( self.onset, task=task, columns=columns, subject=self.sub, run=run) # add response covariate column THIS IS A SHORTCUT FOR NOW! if self.resp_as_cov: # self.response_during_trial self.onset_df["cov"] = self.cov_amplitude
[docs] def index_onset( self, array, columns=None, subject=1, run=1, task=None, set_index=False): if isinstance(array, dict): df = pd.DataFrame(array, index=[0]) elif isinstance(array, pd.DataFrame): df = array.copy() else: df = pd.DataFrame(array, columns=columns) df['subject'] = subject df['run'] = run df['event_type'] = df['event_type'].astype(str) df['onset'] = df['onset'].astype(float) if self.index_task: if isinstance(task, str): df["task"] = task else: df["task"] = "task" # check if we got duration try: df['duration'] = df['duration'].astype(float) except Exception: pass if set_index: return df.set_index(self.onset_index) else: return df
[docs] @staticmethod def array_to_df( array, columns=None, subject=1, key="RTs", run=1, set_index=False): if isinstance(array, dict): df = pd.DataFrame(array) else: df = pd.DataFrame(array, columns=columns) df['subject'] = subject df['run'] = run df[key] = df[key].astype(float) if set_index: return df.set_index(['subject', 'run']) else: return df
[docs] @staticmethod def index_accuracy(array, columns=None, subject=1, run=1, set_index=False): if isinstance(array, dict): df = pd.DataFrame(array, index=[0]) else: df = pd.DataFrame(array, columns=columns) df['subject'] = subject df['run'] = run if set_index: return df.set_index(['subject', 'run']) else: return df
[docs] def get_onset_df(self, index=False): """Return the indexed DataFrame containing onset times""" if index: return self.onset_df.set_index(['subject', 'run', 'event_type']) else: return self.onset_df
[docs] def get_rts_df(self, index=False): """Return the indexed DataFrame containing reaction times""" if index: return self.rts_df.set_index(['subject', 'run']) else: return self.rts_df
[docs] def get_accuracy(self, index=False): """Return the indexed DataFrame containing reaction times""" if index: return self.accuracy_df.set_index(['subject', 'run']) else: return self.accuracy_df
[docs] def get_responses(self, index=False): """Return the indexed DataFrame containing reaction times""" if self.resp_as_cov: ret_df = self.cov_df.copy() else: ret_df = self.button_df.copy() if index: return ret_df.set_index(['subject', 'run']) else: return ret_df
[docs] def onsets_to_fsl( self, fmt='3-column', amplitude=1, duration=None, output_dir=None, output_base=None, from_event=True): """onsets_to_fsl This function creates a text file with a single column containing the onset times of a given condition. Such a file can be used for SPM or FSL modeling, but it should be noted that the onset times have been corrected for the deleted volumes at the beginning. So make sure your inputting the correct functional data in these cases. Parameters ---------- fmt: str format for the onset file (default = 3-column format) amplitude: int, float amplitude for stimulus vector duration: int, float duration of the event; overwrite possible 'duration' column in onsets output_dir: str path to output name for text file(s) output_base: str basename for output file(s); should include full path. '<_task-{task}>_run-{run}_ev-{ev}.txt' will be appended from_event: bool take the event name as specified in the onset dataframe. By default, this is true. In some cases where your events consists of float numbers, it's sometimes easier to number them consecutively. In that case, specify from_event=False` Returns ---------- str for each subject, task, and run, a text file for all events present in the onset dataframe (if only 1 task was present, this will be omitted) """ onsets = self.df_onsets.copy() subj_list = self.get_subjects(onsets) for sub in subj_list: # fetch subject specific onsets df = utils.select_from_df(onsets, expression=f"subject = {sub}") # check if we got task in dataframe indices = list(onsets.index.names) tasks = [] separate_task = False if self.use_bids: if "task" in indices: tasks = utils.get_ids(self.tsv_file) separate_task = True # we got task in dataframe if separate_task: for task in tasks: # get task-specific onsets task_df = utils.select_from_df( df, expression=f"task = {task}") # get runs within task n_runs = self.get_runs(task_df) for run in n_runs: onsets_per_run = utils.select_from_df( task_df, expression=f"run = {run}") events_per_run = self.get_events(onsets_per_run) for ix, ev in enumerate(events_per_run): onsets_per_event = utils.select_from_df( onsets_per_run, expression=f"event_type = {events_per_run[ix]}") if from_event: ev_tag = f"ev-{ev}" else: ev_tag = f"ev-{ix+1}" if output_base is None: if not isinstance(output_dir, str): if isinstance(self.tsv_file, list): outdir = os.path.dirname( self.tsv_file[0]) elif isinstance(self.tsv_file, str): outdir = os.path.dirname(self.tsv_file) else: outdir = os.getcwd() else: outdir = output_dir # create output directory if not os.path.exists(outdir): os.makedirs(outdir, exist_ok=True) fname = opj( outdir, f"task-{task}_run-{run}_{ev_tag}.txt") else: fname = f"{output_base}_task-{task}_run-{run}_{ev_tag}.txt" # fetch the onsets event_onsets = onsets_per_event['onset'].values[..., np.newaxis] if fmt == "3-column": # check if we got duration if 'duration' in list(onsets_per_event.columns): duration_arr = onsets_per_event['duration'].values[..., np.newaxis] else: if not isinstance(duration, (int, float)): duration_arr = np.full_like( onsets_per_event, duration) else: duration_arr = np.ones_like( onsets_per_event) amplitude_arr = np.full_like( event_onsets, amplitude) three_col = np.hstack( (event_onsets, duration_arr, amplitude_arr)) print(f"Writing {fname}; {three_col.shape}") np.savetxt(fname, three_col, delimiter='\t', fmt='%1.3f') else: np.savetxt(fname, event_onsets, delimiter='\t', fmt='%1.3f') else: n_runs = self.get_runs(df) for run in n_runs: onsets_per_run = utils.select_from_df( df, expression=f"run = {run}") events_per_run = self.get_events(onsets_per_run) for ix, ev in enumerate(events_per_run): onsets_per_event = utils.select_from_df( onsets_per_run, expression=f"event_type = {events_per_run[ix]}") if from_event: ev_tag = f"ev-{ev}" else: ev_tag = f"ev-{ix+1}" if output_base is None: if not isinstance(output_dir, str): if isinstance(self.tsv_file, list): outdir = os.path.dirname(self.tsv_file[0]) elif isinstance(self.tsv_file, str): outdir = os.path.dirname(self.tsv_file) else: outdir = os.getcwd() else: outdir = output_dir fname = opj(outdir, f"{ev_tag}_run-{run}.txt") else: fname = f"{output_base}_run-{run}_{ev_tag}.txt" # fetch the onsets event_onsets = onsets_per_event['onset'].values[..., np.newaxis] if fmt == "3-column": # check if we got duration if 'duration' in list(onsets_per_event.columns): duration_arr = onsets_per_event['duration'].values[..., np.newaxis] else: if not isinstance(duration, (int, float)): duration_arr = np.full_like( onsets_per_event, duration) else: duration_arr = np.ones_like( onsets_per_event) amplitude_arr = np.full_like( event_onsets, amplitude) three_col = np.hstack( (event_onsets, duration_arr, amplitude_arr)) print(f"Writing {fname}; {three_col.shape}") np.savetxt(fname, three_col, delimiter='\t', fmt='%1.3f') else: np.savetxt(fname, event_onsets, delimiter='\t', fmt='%1.3f')
[docs] @staticmethod def get_subjects(df): try: df = df.reset_index() except Exception: pass return np.unique(df['subject'].values)
[docs] @staticmethod def get_runs(df): try: df = df.reset_index() except Exception: pass return np.unique(df['run'].values)
[docs] @staticmethod def get_events(df): try: df = df.reset_index() except Exception: pass return np.unique(df['event_type'].values)
[docs] class ParsePhysioFile(): """ParsePhysioFile In similar style to :class:`lazyfmri.dataset.ParseExpToolsFile` and :class:`lazyfmri.dataset.ParseFuncFile`, we use this class to read in physiology-files created with the PhysIO-toolbox (https://www.tnu.ethz.ch/en/software/tapas/ documentations/physio-toolbox) (via `call_spmphysio` for instance). Using the *.mat*-file created with `PhysIO`, we can also attempt to extract `heart rate variability` measures. If this file cannot be found, this operation will be skipped Parameters ---------- physio_file: str path pointing to the regressor file created with PhysIO (e.g., `call_spmphysio`) physio_mat: str path pointing to the *.mat*-file created with PhysIO (e.g., `call_spmphysio`) subject: int subject number in the returned pandas DataFrame (should start with 1, ..., n) run: int run number you'd like to have the onset times for TR: float repetition time to correct onset times for deleted volumes orders: list list of orders used to create the regressor files (see `call_spmphysio`, but default = [2,2,2,]). This one is necessary to create the correct column names for the dataframe deleted_first_timepoints: int, optional number of volumes deleted at the beginning of the timeseries deleted_last_timepoints: int, optional number of volumes deleted at the end of the timeseries Example ---------- .. code-block:: python physio_file = opj(os.path.dirname(func_file), "sub-001_ses-1_task-SR_run-1_physio.txt") physio_mat = opj(os.path.dirname(func_file), "sub-001_ses-1_task-SR_run-1_physio.mat") physio = utils.ParsePhysioFile( physio_file, physio_mat=physio_mat, subject=func.subject, run=func.run, TR=func.TR, deleted_first_timepoints=func.deleted_first_timepoints, deleted_last_timepoints=func.deleted_last_timepoints) physio_df = physio.get_physio(index=False) """ def __init__( self, physio_file, physio_mat=None, subject=1, run=1, TR=0.105, orders=[3, 4, 1], deleted_first_timepoints=0, deleted_last_timepoints=0, use_bids=False, verbose=True, **kwargs): self.physio_file = physio_file self.physio_mat = physio_mat self.sub = subject self.run = run self.TR = TR self.orders = orders self.deleted_first_timepoints = deleted_first_timepoints self.deleted_last_timepoints = deleted_last_timepoints self.physio_mat = physio_mat self.use_bids = use_bids self.verbose = verbose self.__dict__.update(kwargs) utils.verbose("\nPHYSIO", self.verbose) self.physio_cols = [f'c_{i}' for i in range(self.orders[0])] + [f'r_{i}' for i in range( self.orders[1])] + [f'cr_{i}' for i in range(self.orders[2])] if isinstance(self.physio_file, str): self.physio_file = [self.physio_file] if isinstance(self.physio_mat, str): self.physio_mat = [self.physio_mat] if isinstance(self.physio_file, list): df_physio = [] for run, func in enumerate(self.physio_file): utils.verbose(f"Preprocessing {gb}{func}{end}", self.verbose) self.run = run+1 self.ses = None self.task = None if self.use_bids: bids_comps = utils.split_bids_components(func) for el in ['sub', 'run', 'ses', 'task']: if el in list(bids_comps.keys()): setattr(self, el, bids_comps[el]) # check if deleted_first_timepoints is list or not delete_first = check_input_is_list( self, var="deleted_first_timepoints", list_element=run, matcher="func_file") # check if deleted_last_timepoints is list or not delete_last = check_input_is_list( self, var="deleted_last_timepoints", list_element=run, matcher="func_file") if self.physio_mat is not None: if isinstance(self.physio_mat, list): if len(self.physio_mat) == len(self.physio_file): mat_file = self.physio_mat[run] else: raise ValueError( f"""Length of mat-files ({len(self.physio_mat)}) does not match length of physio-files ({len(self.physio_mat)})""") else: raise ValueError( "Please specify a list of mat-files of equal lengths to that of the list of physio files") else: mat_file = None self.preprocess_physio_file( func, physio_mat=mat_file, deleted_first_timepoints=delete_first, deleted_last_timepoints=delete_last) df_physio.append(self.get_physio(index=False)) self.df_physio = pd.concat(df_physio).set_index( ['subject', 'run', 't'])
[docs] def preprocess_physio_file( self, physio_tsv, physio_mat=None, deleted_first_timepoints=0, deleted_last_timepoints=0): self.physio_data = pd.read_csv( physio_tsv, header=None, sep="\t", engine='python', skiprows=deleted_first_timepoints, usecols=list(range(0, len(self.physio_cols)))) self.physio_df = pd.DataFrame(self.physio_data) self.physio_df.drop(self.physio_df.tail( deleted_last_timepoints).index, inplace=True) self.physio_df.columns = self.physio_cols # Try to get the heart rate if physio_mat is not None: self.mat = io.loadmat(physio_mat) try: self.hr = self.mat['physio']['ons_secs'][0][0][0][0][12] except Exception: print(" WARNING: no heart rate trace found..") try: self.rvt = self.mat['physio']['ons_secs'][0][0][0][0][13] except Exception: print(" WARNING: no respiration trace found..") # trim beginning and end for trace in ['hr', 'rvt']: if hasattr(self, trace): if deleted_last_timepoints != 0: self.physio_df[trace] = getattr( self, trace)[deleted_first_timepoints:-deleted_last_timepoints, :] else: self.physio_df[trace] = getattr( self, trace)[deleted_first_timepoints:, :] self.physio_df['subject'], self.physio_df['run'], self.physio_df['t'] = self.sub, self.run, list( self.TR*np.arange(self.physio_df.shape[0]))
[docs] def get_physio(self, index=True): if index: return self.physio_df.set_index(['subject', 'run', 't']) else: return self.physio_df
[docs] class ParseFuncFile(ParseExpToolsFile, ParsePhysioFile): """ParseFuncFile Class for parsing func-files created with Luisa's reconstruction. It can do filtering, conversion to percent signal change, and create power spectra. It is supposed to look similar to :class:`lazyfmri.dataset.ParseExpToolsFile` to make it easy to translate between the functional data and experimental data. Parameters ---------- func_file: str, list path or list of paths pointing to the output file of the experiment subject: int, optional subject number in the returned pandas DataFrame (should start with 1, ..., n) run: int, optional run number you'd like to have the onset times for baseline: float, int, optional Duration of the baseline used to calculate the percent-signal change. This method is the default over `psc_nilearn` baseline_units: str, optional Units of the baseline. Use `seconds`, `sec`, or `s` to imply that `baseline` is in seconds. We'll convert it to volumes internally. If `deleted_first_timepoints` is specified, `baseline` will be corrected for that as well. psc_nilearn: bool, optional Use nilearn method of calculating percent signal change. This method uses the mean of the entire timecourse, rather than the baseline period. Overwrites `baseline` and `baseline_units`. Default is False. standardize: str, optional method of standardization (e.g., "zscore" or "psc") low_pass: bool, optional Temporally smooth the data. It's a bit of a shame if this is needed lb: float, optional lower bound for signal filtering TR: float, optional repetition time to correct onset times for deleted volumes deleted_first_timepoints: int, list, optional number of volumes deleted at the beginning of the timeseries. Can be specified for each individual run if `func_file` is a list deleted_last_timepoints: int, list, optional number of volumes deleted at the end of the timeseries. Can be specified for each individual run if `func_file` is a list window_size: int, optional size of window for rolling median and Savitsky-Golay filter poly_order: int, optional The order of the polynomial used to fit the samples. polyorder must be less than window_length. use_bids: bool, optional If true, we'll read BIDS-components such as 'sub', 'run', 'task', etc from the input file and use those as indexers, rather than sequential 1,2,3. verbose: bool, optional Print details to the terminal, default is False retroicor: bool, optional WIP: implementation of retroicor, requires the specification of `phys_file` and `phys_mat` containing the output from the PhysIO-toolbox n_components: int, optional Number of components to use for WM/CSF PCA during ICA select_component: int, optional If `verbose=True` and `ICA=True`, we'll create a scree-plot of the PCA components. With this flag, you can re-run this call but regress out only this particular component. [Deprecated: `filter_confs` is much more effective] filter_confs: float, optional High-pass filter the components from the components during ICA. This seems to be pretty effective. Default is 0.2Hz. save_as: str, optional Directory + basename for several figures that can be created during the process transpose: bool, optional The data needs to be in the format of <time,voxels>. We'll be trying to force the input data into this format, but sometimes this breaks. This flag serves as an opportunity to flip whatever the default is for a particular input file (e.g., `gii`, `npy`, or `np.ndarray`), so that your final dataframe has the format it needs to have. For gifti-input, we transpose by default. `transpose=True` turns this transposing *off*. For `npy`-inputs, we do **NOT** transpose (we assume the numpy arrays are already in <time,voxels> format). `transpose=True` will transpose this input. report: bool: Save a bunch of figures along the process, including: - Eyetracking fidelity - ICA components - tSNR before/after ICA Directory used is <save_as>/figures + basename that is derived from BIDS components. Example ---------- .. code-block:: python from lazyfmri import utils, dataset func_file = utils.get_file_from_substring( f"run-1_bold.mat", opj('sub-001', 'ses-1', 'func') ) func = utils.ParseFuncFile( func_file, subject=1, run=1, deleted_first_timepoints=100, deleted_last_timepoints=300 ) raw = func.get_raw(index=True) psc = func.get_psc(index=True) """ def __init__( self, func_file, subject=1, run=1, filter_strategy="hp", TR=0.105, lb=0.01, deleted_first_timepoints=0, deleted_last_timepoints=0, window_size=11, poly_order=3, attribute_tag=None, hdf_key="df", tsv_file=None, edf_file=None, phys_file=None, phys_mat=None, use_bids=True, button=False, verbose=True, retroicor=False, ica=False, n_components=5, func_tag=None, select_component=None, standardization="psc", filter_confs=0.2, keep_comps=None, ses1_2_ls=None, run_2_run=None, save_as=None, gm_range=[355, 375], tissue_thresholds=[0.7, 0.7, 0.7], save_ext="svg", transpose=False, baseline=20, baseline_units="seconds", psc_nilearn=False, foldover="FH", report=False, **kwargs ): self.sub = subject self.run = run self.TR = TR self.lb = lb self.deleted_first_timepoints = deleted_first_timepoints self.deleted_last_timepoints = deleted_last_timepoints self.window_size = window_size self.poly_order = poly_order self.attribute_tag = attribute_tag self.hdf_key = hdf_key self.button = button self.func_file = func_file self.tsv_file = tsv_file self.edf_file = edf_file self.phys_file = phys_file self.phys_mat = phys_mat self.use_bids = use_bids self.verbose = verbose self.retroicor = retroicor self.foldover = foldover self.func_tag = func_tag self.n_components = n_components self.select_component = select_component self.filter_confs = filter_confs self.standardization = standardization self.ses1_2_ls = ses1_2_ls self.run_2_run = run_2_run self.save_as = save_as self.gm_range = gm_range self.tissue_thresholds = tissue_thresholds self.save_ext = save_ext self.filter_strategy = filter_strategy self.transpose = transpose self.baseline = baseline self.baseline_units = baseline_units self.psc_nilearn = psc_nilearn self.ica = ica self.keep_comps = keep_comps self.report = report self.__dict__.update(kwargs) # sampling rate and nyquist freq self.fs = 1/self.TR self.fn = self.fs/2 if self.phys_file is not None: ParsePhysioFile.__init__( self, self.phys_file, physio_mat=self.phys_mat, use_bids=self.use_bids, TR=self.TR, deleted_first_timepoints=self.deleted_first_timepoints, deleted_last_timepoints=self.deleted_last_timepoints, **kwargs ) utils.verbose("\nFUNCTIONAL", self.verbose) if isinstance(self.func_file, (str, np.ndarray)): self.func_file = [self.func_file] # store settings self.func_settings = {} self.incl_settings = [ "TR", "lb", "deleted_first_timepoints", "deleted_last_timepoints", "window_size", "poly_order", "button", "func_file", "tsv_file", "edf_file", "verbose", "retroicor", "foldover", "func_tag", "n_components", "select_component", "filter_confs", "standardization", "ses1_2_ls", "run_2_run", "save_as", "gm_range", "tissue_thresholds", "filter_strategy", "baseline", "baseline_units", "psc_nilearn", "ica", "keep_comps" ] for key in self.incl_settings: self.func_settings[key] = getattr(self, key) # check if we should index task self.index_task = False self.index_list = ["subject", "run", "t"] if self.use_bids: self.task_ids = utils.get_ids(self.func_file, bids="task") if len(self.task_ids) > 1: self.index_task = True self.index_list = ["subject", "task", "run", "t"] if isinstance(self.func_file, list): # initiate some dataframes self.df_psc = [] # psc-data (filtered or not) self.df_raw = [] # raw-data (filtered or not) # z-score data (retroicor'ed, `if retroicor=True`) self.df_retro = [] # r2 for portions of retroicor-regressors (e.g., 'all', 'cardiac', etc) self.df_r2 = [] self.df_zscore = [] # zscore-d data self.df_ica = [] # ica'ed data self.ica_objs = [] # keep track of all ICA objects # reports for run_id, func in enumerate(self.func_file): if isinstance(func, str): utils.verbose(f"Preprocessing {gb}{func}{end}", self.verbose) elif isinstance(func, np.ndarray): utils.verbose( f"Preprocessing {gb}array {run_id+1}{end} in list", self.verbose) # override use_bids. Can't be use with numpy arrays self.use_bids = False else: raise ValueError( f"Unknown input type '{type(func)}'. Must be string or numpy-array") self.run = run_id+1 self.ses = None self.task = None if self.use_bids: if isinstance(func, str): bids_comps = utils.split_bids_components(func) for el in ['sub', 'run', 'ses', 'task']: if el in list(bids_comps.keys()): setattr(self, el, bids_comps[el]) # set base name based on presence of bids tags self.base_name = f"sub-{self.sub}" if isinstance(self.ses, (str, float, int)): self.base_name += f"_ses-{self.ses}" if isinstance(self.task, str): self.base_name += f"_task-{self.task}" # check if deleted_first_timepoints is list or not delete_first = check_input_is_list( self, var="deleted_first_timepoints", list_element=run_id, matcher="func_file") # check if deleted_last_timepoints is list or not delete_last = check_input_is_list( self, var="deleted_last_timepoints", list_element=run_id, matcher="func_file") # check if baseline is list or not baseline = check_input_is_list( self, var="baseline", list_element=run_id, matcher="func_file") utils.verbose( f" Filtering strategy: '{rb}{self.filter_strategy}{end}'", self.verbose) utils.verbose( f" Standardization strategy: '{rb}{self.standardization}{end}'", self.verbose) self.preprocess_func_file( func, run=self.run, task=self.task, deleted_first_timepoints=delete_first, deleted_last_timepoints=delete_last, baseline=baseline, **kwargs ) if self.standardization == "psc": self.df_psc.append( self.get_data( index=False, filter_strategy=self.filter_strategy, dtype='psc', ica=self.ica ) ) elif self.standardization == "zscore": self.df_zscore.append( self.get_data( index=False, filter_strategy=self.filter_strategy, dtype='zscore', ica=False ) ) self.df_raw.append( self.get_data( index=False, filter_strategy=None, dtype='raw')) if self.retroicor: self.df_retro.append(self.get_retroicor(index=False)) self.df_r2.append(self.r2_physio_df) if self.ica: self.df_ica.append( self.get_data( index=False, filter_strategy=self.filter_strategy, dtype=self.standardization, ica=True ) ) self.ica_objs.append(self.ica_obj) # check for standardization method if self.standardization == "psc": self.df_func_psc = pd.concat(self.df_psc) elif self.standardization == "zscore": self.df_func_zscore = pd.concat(self.df_zscore) # we'll always have raw data self.df_func_raw = pd.concat(self.df_raw) if self.retroicor: try: self.df_func_retroicor = pd.concat( self.df_retro).set_index(self.index_list) self.df_physio_r2 = pd.concat(self.df_r2) except Exception: raise ValueError( "RETROICOR did not complete successfully..") if self.ica: # check if elements of list contain dataframes if all(elem is None for elem in self.df_ica): utils.verbose( "WARNING: ICA did not execute properly. All runs have 'None'", True) else: try: self.df_func_ica = pd.concat( self.df_ica).set_index(self.index_list) except Exception: self.df_func_ica = pd.concat(self.df_ica) # now that we have nicely formatted functional data, initialize the ParseExpToolsFile-class if self.tsv_file is not None: ParseExpToolsFile.__init__( self, self.tsv_file, subject=self.sub, deleted_first_timepoints=self.deleted_first_timepoints, TR=self.TR, edfs=self.edf_file, funcs=self.func_file, use_bids=self.use_bids, button=self.button, verbose=self.verbose, invoked_from_func=True, save_as=self.save_as, **kwargs )
[docs] def preprocess_func_file( self, func_file, run=1, task=None, deleted_first_timepoints=0, deleted_last_timepoints=0, baseline=None, **kwargs ): # ---------------------------------------------------------------------------------------------------------------------------------------------- # BASIC DATA LOADING # Load in datasets with tag "wcsmtSNR" self.stop_process = False if isinstance(func_file, str): if func_file.endswith("mat"): # load matlab file self.ts_wcsmtSNR = io.loadmat(func_file) # decide which key to read from the .mat file if self.func_tag is None: self.tag = list(self.ts_wcsmtSNR.keys())[-1] else: self.tag = self.func_tag # select data self.ts_wcsmtSNR = self.ts_wcsmtSNR[self.tag] self.ts_complex = self.ts_wcsmtSNR self.ts_magnitude = np.abs(self.ts_wcsmtSNR) elif func_file.endswith('gii'): self.gif_obj = ParseGiftiFile(func_file) self.ts_magnitude = self.gif_obj.data if not self.transpose: self.ts_magnitude = self.gif_obj.data.T elif func_file.endswith("npy"): self.ts_magnitude = np.load(func_file) if self.transpose: self.ts_magnitude = np.load(func_file).T elif func_file.endswith("nii") or func_file.endswith("gz"): # read niimg nimg = nb.load(func_file) fdata = nimg.get_fdata() # read TR from header if not isinstance(nimg, nb.cifti2.cifti2.Cifti2Image): self.hdr = nimg.header self.affine = nimg.affine self.TR = self.hdr["pixdim"][4] # cifti nii's are already 2D if fdata.ndim > 2: self.orig_dim = fdata.shape xdim, ydim, zdim, time_points = fdata.shape self.ts_magnitude = fdata.reshape( xdim*ydim*zdim, time_points) else: self.ts_magnitude = fdata.copy().T elif func_file.endswith("pkl"): with open(func_file, 'rb') as handle: df = pickle.load(handle) setattr(self, f"data_{self.standardization}_df", df) setattr(self, "data_raw_df", df) self.stop_process = True elif isinstance(func_file, np.ndarray): self.ts_magnitude = func_file.copy() else: raise NotImplementedError( f"Input type {type(func_file)} not supported") if not self.stop_process: # check baseline if not self.psc_nilearn: if self.baseline_units in ["seconds", "s", "sec"]: baseline_vols_old = int(np.round(baseline*self.fs, 0)) utils.verbose( f" Baseline is {rb}{baseline}{end} seconds, or {rb}{baseline_vols_old}{end} TRs", self.verbose) else: baseline_vols_old = baseline utils.verbose(f" Baseline is {rb}{baseline}{end} TRs", self.verbose) # correct for deleted samples baseline_vols = baseline_vols_old-self.deleted_first_timepoints txt = f" (also cut from baseline (was {baseline_vols_old}, now {baseline_vols} TRs)" else: txt = "" baseline_vols = 0 # trim beginning and end if deleted_last_timepoints != 0: self.ts_corrected = self.ts_magnitude[:, deleted_first_timepoints:-deleted_last_timepoints] else: self.ts_corrected = self.ts_magnitude[:, deleted_first_timepoints:] utils.verbose( f" Cutting {rb}{deleted_first_timepoints}{end} vols from beginning{txt} | {rb}{deleted_last_timepoints}{end} vols from end", self.verbose) self.vox_cols = [f'vox {x}' for x in range( self.ts_corrected.shape[0])] # ---------------------------------------------------------------------------------------------------------------------------------------------- # STANDARDIZATION OF UNFILTERED DATA & CREATE DATAFRAMES # dataframe of raw, unfiltered data self.data_raw = self.ts_corrected.copy() self.data_raw_df = self.index_func( self.data_raw, columns=self.vox_cols, subject=self.sub, task=task, run=run, TR=self.TR, set_index=True ) # dataframe of unfiltered PSC-data self.data_psc = utils.percent_change( self.data_raw, 1, baseline=baseline_vols, nilearn=self.psc_nilearn ) self.data_psc_df = self.index_func( self.data_psc, columns=self.vox_cols, subject=self.sub, task=task, run=run, TR=self.TR, set_index=True ) # dataframe of unfiltered z-scored data self.data_zscore = (self.data_raw-self.data_raw.mean(axis=1, keepdims=True))/self.data_raw.std(axis=1, keepdims=True) self.data_zscore_df = self.index_func( self.data_zscore, columns=self.vox_cols, subject=self.sub, task=task, run=run, TR=self.TR, set_index=True) # ---------------------------------------------------------------------------------------------------------------------------------------------- # HIGH PASS FILTER self.clean_tag = None if self.filter_strategy != "raw": utils.verbose( f" DCT-high pass filter [removes low frequencies <{rb}{self.lb}{end} Hz] to correct low-frequency drifts.", self.verbose) # highpass data self.hp_raw, self._cosine_drift = preproc.highpass_dct( self.data_raw, lb=self.lb, TR=self.TR ) # index df self.hp_raw_df = self.index_func( self.hp_raw, columns=self.vox_cols, subject=self.sub, task=task, run=run, TR=self.TR, set_index=True ) # dataframe of high-passed PSC-data (set NaN to 0) self.hp_psc = np.nan_to_num( utils.percent_change( self.hp_raw, 1, baseline=baseline_vols, nilearn=self.psc_nilearn ) ) # index df self.hp_psc_df = self.index_func( self.hp_psc, columns=self.vox_cols, subject=self.sub, run=run, task=task, TR=self.TR, set_index=True ) # dataframe of high-passed z-scored data self.hp_zscore = (self.hp_raw-self.hp_raw.mean(axis=1, keepdims=True))/self.hp_raw.std(axis=1, keepdims=True) # index df self.hp_zscore_df = self.index_func( self.hp_zscore, columns=self.vox_cols, subject=self.sub, run=run, TR=self.TR, task=task, set_index=True ) # save SD and Mean so we can go from zscore back to original self.zscore_SD = self.hp_raw.std(axis=-1, keepdims=True) self.zscore_M = self.hp_raw.mean(axis=-1, keepdims=True) if self.ica: # run ICA self.run_ica( task=task, save_as=self.save_as ) self.clean_tag = "ica" self.clean_data = self.ica_obj.ica_data if hasattr(self, "clean_data"): self.tmp_df = self.index_func( self.clean_data, columns=self.vox_cols, subject=self.sub, task=task, run=run, TR=self.TR, set_index=True ) setattr(self, f"hp_{self.clean_tag}_df", self.tmp_df) # multiply by SD and add mean self.tmp_raw = (self.clean_data * self.zscore_SD) + self.zscore_M setattr(self, f"hp_{self.clean_tag}_raw", self.tmp_raw) self.tmp_raw_df = self.index_func( self.tmp_raw, columns=self.vox_cols, subject=self.sub, task=task, run=run, TR=self.TR, set_index=True ) setattr(self, f"hp_{self.clean_tag}_raw_df", self.tmp_raw) # make percent signal self.hp_tmp_psc = np.nan_to_num( utils.percent_change( self.tmp_raw, 1, baseline=baseline_vols, nilearn=self.psc_nilearn ) ) setattr(self, f"hp_{self.clean_tag}_psc", self.hp_tmp_psc) self.hp_tmp_psc_df = self.index_func( self.hp_tmp_psc, columns=self.vox_cols, subject=self.sub, task=task, run=run, TR=self.TR, set_index=True ) setattr(self, f"hp_{self.clean_tag}_psc_df", self.hp_tmp_psc_df) # -------------------------------------------------------------------------------- # LOW PASS FILTER if "lp" in self.filter_strategy: if self.ica: info = f" Using {self.clean_tag}-data for low-pass filtering" data_for_filtering = self.get_data( index=True, filter_strategy="hp", dtype=self.standardization, ica=self.ica ).T.values out_attr = f"lp_{self.clean_tag}_{self.standardization}" elif hasattr(self, f"hp_{self.standardization}"): info = " Using high-pass filtered data for low-pass filtering" data_for_filtering = getattr( self, f"hp_{self.standardization}") out_attr = f"lp_{self.standardization}" else: info = " Using unfiltered data for low-pass filtering" data_for_filtering = getattr( self, f"data_{self.standardization}") out_attr = f"lp_data_{self.standardization}" utils.verbose(info, self.verbose) utils.verbose( f" Savitsky-Golay low-pass filter [removes high freqs] (w={rb}{self.window_size}{end}, o={rb}{self.poly_order}{end})", self.verbose) tmp_filtered = preproc.lowpass_savgol( data_for_filtering, window_length=self.window_size, polyorder=self.poly_order ) tmp_filtered_df = self.index_func( tmp_filtered, columns=self.vox_cols, subject=self.sub, task=task, run=run, TR=self.TR, set_index=True ) setattr(self, out_attr, tmp_filtered.copy()) setattr(self, f'{out_attr}_df', tmp_filtered_df.copy()) else: self.clean_tag = None # get basic qualities self.basic_qa( self.ts_corrected, run=run, save_as=self.save_as, make_figure=self.report )
[docs] def to_nifti(self, func, fname=None): func_res = func.reshape(self.orig_dim) print(func_res.shape) niimg = nb.Nifti1Image(func_res, affine=self.affine, header=self.hdr) if isinstance(fname, str): niimg.to_filename(fname) return fname else: return niimg
[docs] def run_ica( self, task=None, save_as=None ): utils.verbose( f" Running FastICA with {rb}{self.n_components}{end} components", self.verbose) self.ica_obj = preproc.ICA( self.hp_zscore_df, subject=f"sub-{self.sub}", ses=self.ses, run=self.run, task=task, n_components=self.n_components, TR=self.TR, filter_confs=self.filter_confs, keep_comps=self.keep_comps, verbose=self.verbose, zoom_freq=True, ribbon=tuple(self.gm_range), save_as=save_as, summary_plot=self.report ) # regress self.ica_obj.regress()
[docs] def basic_qa( self, data, run=1, make_figure=False, save_as=None ): # tsnr tsnr_pre = utils.calculate_tsnr(data, -1) mean_tsnr_pre = float(np.nanmean(np.ravel(tsnr_pre))) # variance var_pre = np.var(data, axis=-1) mean_var_pre = float(var_pre.mean()) tsnr_inputs = [tsnr_pre] var_inputs = [var_pre] colors = "#1B9E77" tsnr_lbl = None var_lbl = None tsnr_lines = { "pos": mean_tsnr_pre, "color": colors } var_lines = { "pos": mean_var_pre, "color": colors } if self.verbose: if self.clean_tag is None: info = "no cleaning" else: info = f"before '{self.clean_tag}'" utils.verbose( f" tSNR [{info}]: {rb}{round(mean_tsnr_pre,2)}{end}\t| variance: {rb}{round(mean_var_pre,2)}{end}", self.verbose) if self.clean_tag in ["ica"]: # get aCompCor/ICA'ed tSNR tsnr_post = utils.calculate_tsnr( getattr(self, f"hp_{self.clean_tag}_raw"), -1) mean_tsnr_post = float(np.nanmean(np.ravel(tsnr_post))) # variance var_post = np.var( getattr(self, f"hp_{self.clean_tag}_raw"), axis=-1) mean_var_post = float(var_post.mean()) # sort out plotting stuff tsnr_inputs += [tsnr_post] var_inputs += [var_post] colors = [colors, "#D95F02"] tsnr_lbl = [ f'no {self.clean_tag} [{round(mean_tsnr_pre,2)}]', f'{self.clean_tag} [{round(mean_tsnr_post,2)}]'] var_lbl = [ f'no {self.clean_tag} [{round(mean_var_pre,2)}]', f'{self.clean_tag} [{round(mean_var_post,2)}]'] tsnr_lines = { "pos": [mean_tsnr_pre, mean_tsnr_post], "color": colors } var_lines = { "pos": [mean_var_pre, mean_var_post], "color": colors } utils.verbose( f" tSNR [after '{self.clean_tag}']: {rb}{round(mean_tsnr_post,2)}{end}\t| variance: {rb}{round(mean_var_post,2)}{end}", self.verbose) if make_figure: # initiate figure fig = plt.figure(figsize=(24, 7)) gs = fig.add_gridspec(1, 3, width_ratios=[10, 10, 10]) # imshow for apparent motion ax1 = fig.add_subplot(gs[0]) ax1.imshow(np.rot90(data.T), aspect=8/1) vox_ticks = [0, data.shape[0]//2, data.shape[0]] ax1 = plotting.conform_ax_to_obj( ax1, x_label="volumes", y_label="voxels", title="Stability of position", font_size=16, x_ticks=[0, data.shape[-1]], y_ticks=vox_ticks ) # line plot for tSNR over voxels ax2 = fig.add_subplot(gs[1]) plotting.LazyLine( tsnr_inputs, axs=ax2, title="tSNR across the line", font_size=16, linewidth=2, color=colors, x_label="voxels", y_label="tSNR (a.u.)", labels=tsnr_lbl, add_hline=tsnr_lines, x_ticks=vox_ticks, line_width=2) ax3 = fig.add_subplot(gs[2]) plotting.LazyLine( var_inputs, axs=ax3, title="Variance across the line", font_size=16, linewidth=2, color=colors, x_label="voxels", y_label="Variance", labels=var_lbl, add_hline=var_lines, x_ticks=vox_ticks, line_width=2 ) plt.close(fig) if self.report: if save_as is None: save_as = opj(os.getcwd(), "figures") if not os.path.exists(save_as): os.makedirs(save_as, exist_ok=True) # save fname = opj(save_as, f"{self.base_name}_run-{run}_desc-qa.{self.save_ext}") utils.verbose(f" Writing {bm}{fname}{end}", self.verbose) fig.savefig(fname, bbox_inches='tight', dpi=300)
[docs] def get_data( self, filter_strategy=None, index=False, dtype="psc", ica=False ): if dtype not in ["psc", "zscore", "raw"]: raise ValueError( f"Requested data type '{dtype}' is not supported. Use 'psc', 'zscore', or 'raw'") return_data = None allowed = [None, "raw", "hp", "lp"] if ica: tag = f"{self.clean_tag}_{dtype}" else: tag = dtype if filter_strategy is None or filter_strategy == "raw": attr = f"data_{tag}_df" elif filter_strategy == "lp": attr = f"lp_{tag}_df" elif filter_strategy == "hp": attr = f"hp_{tag}_df" else: raise ValueError( f"Unknown attribute '{filter_strategy}'. Must be one of: {allowed}") if hasattr(self, attr): # print(f" Fetching attribute: {attr}") return_data = getattr(self, attr) else: raise ValueError( f"{self} does not have an attribute called '{attr}'") if isinstance(return_data, pd.DataFrame): if index: try: return return_data.set_index(['subject', 'run', 't']) except Exception: return return_data else: return return_data
[docs] def index_func( self, array, columns=None, subject=1, run=1, task=None, TR=0.105, set_index=False): if columns is None: df = pd.DataFrame(array.T) else: df = pd.DataFrame(array.T, columns=columns) df['subject'] = subject df['run'] = run df['t'] = list(TR*np.arange(df.shape[0])) if self.index_task: if isinstance(task, str): df["task"] = task else: df["task"] = "task" if set_index: return df.set_index(self.index_list) else: return df
[docs] class Dataset(ParseFuncFile, SetAttributes): """Dataset Main class for retrieving, formatting, and preprocessing of all datatypes including fMRI (2D), eyetracker (.edf), physiology (.log [WIP]), and experiment files derived from `Exptools2` (.tsv). If you leave `subject` and `run` empty, these elements will be derived from the file names. So if you have BIDS-like files, leave them empty and the dataframe will be created for you with the correct subject/run IDs. Inherits from :class:`lazyfmri.dataset.ParseFuncFile`, so all arguments from that class are available and are passed on via `kwargs`. Only `func_file` and `verbose` are required. The first one is necessary because if the input is an *h5*-file, we'll set the attributes accordingly. Otherwise :class:`lazyfmri.dataset.ParseFuncFile` is invoked. `verbose` is required for aesthetic reasons. Given that :class:`lazyfmri.dataset.ParseFuncFile` inherits in turn from :class:`lazyfmri.dataset.ParseExpToolsFile`, you can pass the arguments for that class here as well. Parameters ---------- func_file: str, list path or list of paths pointing to the output file of the experiment verbose: bool, optional Print details to the terminal, default is False Example ---------- .. code-block:: python from fmriproc import dataset from lazyfmri import utils func_dir = "/some/dir" exp = utils.get_file_from_substring("tsv", func_dir) funcs = utils.get_file_from_substring("bold.mat", func_dir) # only cut from SR-runs delete_first = 100 delete_last = 0 window = 19 order = 3 data = dataset.Dataset( funcs, deleted_first_timepoints=delete_first, deleted_last_timepoints=delete_last, tsv_file=exp, verbose=True) # retrieve data fmri = data.fetch_fmri() onsets = data.fetch_onsets() """ def __init__( self, func_file, verbose=False, **kwargs ): self.verbose = verbose utils.verbose("DATASET", self.verbose) # set attributes SetAttributes.__init__(self) if isinstance(func_file, str) and func_file.endswith(".h5"): self.from_hdf(func_file) # set all kwargs # print(kwargs) self.__dict__.update(kwargs) else: ParseFuncFile.__init__( self, func_file, verbose=self.verbose, **kwargs ) utils.verbose("\nDATASET: created", self.verbose)
[docs] def fetch_fmri(self, strip_index=False, dtype=None): if dtype is None: if hasattr(self, "ica"): if self.ica: dtype = "ica" elif hasattr(self, "standardization"): dtype = self.standardization else: dtype = "psc" if dtype == "psc": attr = 'df_func_psc' elif dtype == "retroicor": attr = 'df_func_retroicor' elif dtype == "raw" or dtype is None: attr = 'df_func_raw' elif dtype == "zscore": attr = 'df_func_zscore' elif dtype == "ica": attr = 'df_func_ica' else: raise ValueError( f"Unknown option '{dtype}'. Must be 'psc', 'retroicor', or 'zscore'") if hasattr(self, attr): utils.verbose( f"Fetching dataframe from attribute '{rb}{attr}{end}'", self.verbose) df = getattr(self, attr) if strip_index: return df.reset_index().drop(labels=['subject', 'run', 't'], axis=1) else: return df else: utils.verbose(f"Could not find '{attr}' attribute", True)
[docs] def fetch_onsets(self, strip_index=False, button=True): if hasattr(self, 'df_onsets'): if strip_index: df = self.df_onsets.reset_index().drop( labels=list(self.df_onsets.index.names), axis=1) else: df = self.df_onsets # filter out button if not button: df = utils.select_from_df( df, expression="event_type != response") return df else: utils.verbose("No event-data was provided", True)
[docs] def fetch_rts(self, strip_index=False): if hasattr(self, 'df_rts'): if strip_index: return self.df_rts.reset_index().drop(labels=list(self.df_rts.index.names), axis=1) else: return self.df_rts else: utils.verbose("No reaction times were provided", True)
[docs] def fetch_accuracy(self, strip_index=False): if hasattr(self, 'df_accuracy'): if strip_index: return self.df_accuracy.reset_index().drop(labels=list(self.df_accuracy.index.names), axis=1) else: return self.df_accuracy else: utils.verbose("No accuracy measurements found", True)
[docs] def fetch_physio(self, strip_index=False): if hasattr(self, 'df_physio'): if strip_index: return self.df_physio.reset_index().drop(labels=list(self.df_physio.index.names), axis=1) else: return self.df_physio else: utils.verbose("No physio-data was provided", True)
[docs] def fetch_trace(self, strip_index=False): if hasattr(self, 'df_space_func'): if strip_index: return self.df_space_func.reset_index().drop(labels=list(self.df_space_func.index.names), axis=1) else: return self.df_space_func else: utils.verbose("No eyetracking-data was provided", True)
[docs] def from_hdf(self, input_file=None): if not isinstance(input_file, str): raise ValueError("No output file specified") else: self.h5_file = input_file if not os.path.exists(self.h5_file): raise FileNotFoundError(f"Could not find file: '{self.h5_file}'") utils.verbose(f"Reading from {gb}{self.h5_file}{end}", self.verbose) hdf_store = pd.HDFStore(self.h5_file) hdf_keys = hdf_store.keys() for key in hdf_keys: key = key.strip("/") try: setattr(self, key, hdf_store.get(key)) utils.verbose(f" Set attribute: {rb}{key}{end}", self.verbose) except Exception: utils.verbose( f" Could not set attribute '{key}'", self.verbose) hdf_store.close()
[docs] def to_hdf( self, h5_file=None, overwrite=False, ): # make the directory if not os.path.exists(os.path.dirname(h5_file)): os.makedirs(os.path.dirname(h5_file)) if overwrite: if os.path.exists(h5_file): store = pd.HDFStore(h5_file) store.close() os.remove(h5_file) utils.verbose(f"Saving to {bm}{h5_file}{end}", self.verbose) for attr in self.all_attributes: if hasattr(self, attr): add_df = getattr(self, attr) # try regular storing if isinstance(add_df, pd.DataFrame): try: add_df.to_hdf( h5_file, key=attr, append=True, mode='a', format='t' ) utils.verbose(f" Stored attribute: {gb}{attr}{end}", self.verbose) except Exception: # send error message utils.verbose(f" Could not store attribute '{attr}'", self.verbose) # define json file self.json_file = h5_file.split(".")[0]+".json" if os.path.exists(self.json_file): os.remove(self.json_file) # Serializing json json_object = json.dumps(self.func_settings, indent=4) # Writing to sample.json with open(self.json_file, "w") as outfile: outfile.write(json_object) utils.verbose("Done", self.verbose) store = pd.HDFStore(h5_file) store.close()
[docs] def to4D(self, fname=None, desc=None, dtype=None, mask=None): # get dataset df = self.fetch_fmri(dtype=dtype) subj_list = self.get_subjects(df) file_counter = 0 for sub in subj_list: # get subject-specific data data_per_subj = utils.select_from_df( df, expression=f"subject = {sub}") # get run IDs n_runs = self.get_runs(df) for run in n_runs: # get run-specific data data_per_run = utils.select_from_df( data_per_subj, expression=f"run = {run}") # get corresponding reference image from self.func_file either based on index (if use_bids=False), or based on # BIDS-elements (use_bids=True) if self.use_bids: ref_img = utils.get_file_from_substring( [f'sub-{sub}', f'run-{run}'], self.func_file) else: ref_img = self.func_file[file_counter] utils.verbose(f"Ref img = {gb}{ref_img}{end}", self.verbose) if isinstance(ref_img, nb.Nifti1Image): ref_img = ref_img elif isinstance(ref_img, str): if ref_img.endswith("gz") or ref_img.endswith("nii"): ref_img = nb.load(ref_img) else: raise TypeError( f"Unknown reference type '{ref_img}'. Must be a string pointing to 'nii' or 'nii.gz' file") else: raise ValueError( "'ref_img' must either be string pointing to nifti image or a nb.Nifti1Image object") # get information of reference image dims = ref_img.get_fdata().shape aff = ref_img.affine hdr = ref_img.header utils.verbose(f"Ref shape = {rb}{dims}{end}", self.verbose) data_per_run = data_per_run.values # time is initially first axis, so transpose if data_per_run.shape[-1] != dims[-1]: utils.verbose( f"Data shape = {rb}{data_per_run.shape}{end}; transposing..", self.verbose) data_per_run = data_per_run.T else: utils.verbose( f"Data shape = {rb}{data_per_run.shape}{end}; all good..", self.verbose) utils.verbose( f"Final shape = {rb}{data_per_run.shape}{end}", self.verbose) # check if we have mask if isinstance(mask, nb.Nifti1Image) or isinstance(mask, str) or isinstance(mask, list): utils.verbose( "Masking with given mask-object", self.verbose) if isinstance(mask, nb.Nifti1Image): mask = mask elif isinstance(mask, str): if mask.endswith("gz") or mask.endswith("nii"): mask = nb.load(mask) else: raise TypeError( f"Unknown reference type '{mask}'. Must be a string pointing to 'nii' or 'nii.gz' file") elif isinstance(mask, list): # select mask based on BIDS-components or index if self.use_bids: mask = utils.get_file_from_substring( [f'sub-{sub}', f'run-{run}'], mask) else: mask = mask[file_counter] else: raise TypeError( f"""Unknown input '{type(mask)}', must be nibabel.Nifti1Image-object or string pointing to nifti-image""") # mask array mask_data = mask.get_fdata() mask_data = mask_data.reshape(np.prod(mask_data.shape)) brain_idc = np.where(mask_data > 0)[0] data_masked = np.zeros_like(data_per_run) # fill zeroed array with brandata data_masked[brain_idc, :] = data_per_run[brain_idc, :] # overwrite data_per_run = data_masked.copy() # reshape data_per_run = data_per_run.reshape(*dims) # save if not isinstance(fname, str): if isinstance(desc, str): fname = f"sub-{sub}_run-{run}_desc-{desc}.nii.gz" else: fname = f"sub-{sub}_run-{run}.nii.gz" else: if isinstance(desc, str): fname = f"{fname}_run-{run}_desc-{desc}.nii.gz" else: fname = f"{fname}_run-{run}.nii.gz" utils.verbose(f"Writing {gb}{fname}{end}", self.verbose) nb.Nifti1Image(data_per_run, affine=aff, header=hdr).to_filename(fname) file_counter += 1
[docs] class DatasetCollector(): def __init__(self, dataset_objects): self.datasets = dataset_objects if len(self.datasets) is not None: self.data = [] self.onsets = [] for dataset in self.datasets: self.data.append(dataset.fetch_fmri()) # check if we got onsets if hasattr(dataset, 'df_onsets'): onsets = True self.onsets.append(dataset.fetch_onsets()) else: onsets = False self.data = pd.concat(self.data) if onsets: self.onsets = pd.concat(self.onsets)
# this is basically a wrapper around pybest.utils.load_gifti
[docs] class ParseGiftiFile(): """ParseGiftiFile Read a gifti-file into a dataframe similar to :class:`lazyfmri.dataset.ParseFuncFile`. Also allows you to set the RepetitionTime in the metadata and rewrite the gifti-file. The final data is set as ``self.data``, representing the `numpy.ndarray`-form of the gifti-file. Parameters ---------- gifti_file: str Path pointing to gifti-file set_tr: int, float, optional Set the TR in milliseconds in the metadata, by default None \*gii_args, \*gii_kwargs: dict Arguments passed to `nb.gifti.GiftiDataArray` Raises ---------- ValueError If ``gifti-file`` does not end with ".gii". Example ---------- .. code-block:: python from lazyfmri import dataset gii_file = "sub-01_ses-1_task-rest_space-fsnative_bold.gii" obj = dataset.ParseGiftiFile(gii_file) data = obj.data """ def __init__(self, gifti_file, set_tr=None, *gii_args, **gii_kwargs): if isinstance(gifti_file, str): self.gifti_file = gifti_file self.f_gif = nb.load(self.gifti_file) self.data = np.vstack([arr.data for arr in self.f_gif.darrays]) elif isinstance(gifti_file, np.ndarray): self.gifti_file = None self.data = gifti_file else: raise ValueError("Input must be a string ending with '.gii' or a numpy array") # get TR self.set_tr = set_tr if isinstance(self.set_tr, (int, float)): self.meta_obj = self.set_metadata(tr=self.set_tr) self.TR_ms = float(self.meta_dict['TimeStep']) self.TR_sec = float(self.meta_dict['TimeStep']) / 1000 # overwrite original file if isinstance(self.gifti_file, str): self.write_file(self.gifti_file, *gii_args, **gii_kwargs) else: if len(self.f_gif.darrays[0].metadata) > 0: self.TR_ms = float(self.f_gif.darrays[0].metadata['TimeStep']) self.TR_sec = float( self.f_gif.darrays[0].metadata['TimeStep']) / 1000
[docs] def set_metadata(self, tr=None): self.meta_dict = {'TimeStep': str(float(tr))} return nb.gifti.GiftiMetaData().from_dict(self.meta_dict)
[docs] def get_tr(self, units="sec"): if units not in ["sec", "ms"]: raise ValueError( f"units must be one of 'sec' or 'ms', not '{units}'") if hasattr(self, f"TR_{units}"): return getattr(self, f"TR_{units}") else: return None
[docs] def write_file( self, filename, tr=None, **kwargs ): metadata = None if not isinstance(tr, (int, float)): if hasattr(self, "meta_obj"): metadata = self.meta_obj else: metadata = self.set_metadata(tr=tr) self.TR_ms = tr self.TR_sec = tr/1000 # copy old data and combine it with metadata darray = nb.gifti.GiftiDataArray( self.data, meta=metadata, **kwargs ) # store in new gifti image object gifti_image = nb.GiftiImage() # add data to this object gifti_image.add_gifti_data_array(darray) # save in same file name nb.save(gifti_image, filename)