diff --git a/narps_open/pipelines/__init__.py b/narps_open/pipelines/__init__.py index e0dba921..ae22664b 100644 --- a/narps_open/pipelines/__init__.py +++ b/narps_open/pipelines/__init__.py @@ -68,7 +68,7 @@ 'R7D1': None, 'R9K3': None, 'SM54': None, - 'T54A': None, + 'T54A': 'PipelineTeamT54A', 'U26C': None, 'UI76': None, 'UK24': None, diff --git a/narps_open/pipelines/team_T54A.py b/narps_open/pipelines/team_T54A.py index 1bb10dcd..ae34b848 100644 --- a/narps_open/pipelines/team_T54A.py +++ b/narps_open/pipelines/team_T54A.py @@ -1,630 +1,824 @@ -from nipype.interfaces.fsl import (BET, ICA_AROMA, FAST, MCFLIRT, FLIRT, FNIRT, ApplyWarp, SUSAN, - Info, ImageMaths, IsotropicSmooth, Threshold, Level1Design, FEATModel, - L2Model, Merge, FLAMEO, ContrastMgr,Cluster, FILMGLS, Randomise, MultipleRegressDesign) -from nipype.algorithms.modelgen import SpecifyModel +#!/usr/bin/python +# coding: utf-8 + +""" Write the work of NARPS team T54A using Nipype """ + +from os.path import join +from itertools import product -from niflow.nipype1.workflows.fmri.fsl import create_susan_smooth -from nipype.interfaces.utility import IdentityInterface, Function -from nipype.interfaces.io import SelectFiles, DataSink -from nipype.algorithms.misc import Gunzip from nipype import Workflow, Node, MapNode -from nipype.interfaces.base import Bunch - -from os.path import join as opj -import os - -def get_session_infos(event_file): - ''' - Create Bunchs for specifyModel. - - Parameters : - - event_file : str, file corresponding to the run and the subject to analyze - - Returns : - - subject_info : list of Bunch for 1st level analysis. - ''' - from os.path import join as opj - from nipype.interfaces.base import Bunch - import numpy as np - - cond_names = ['trial', 'gain', 'loss', 'difficulty','response'] - - onset = {} - duration = {} - amplitude = {} - - for c in cond_names: # For each condition. - onset.update({c : []}) # creates dictionary items with empty lists - duration.update({c : []}) - amplitude.update({c : []}) - - with open(event_file, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - # Creates list with onsets, duration and loss/gain for amplitude (FSL) - for c in cond_names: +from nipype.interfaces.utility import IdentityInterface, Function, Split +from nipype.interfaces.io import SelectFiles, DataSink +from nipype.interfaces.fsl import ( + BET, IsotropicSmooth, Level1Design, FEATModel, L2Model, Merge, FLAMEO, + FILMGLS, Randomise, MultipleRegressDesign, FSLCommand + ) +from nipype.algorithms.modelgen import SpecifyModel +from nipype.interfaces.fsl.maths import MultiImageMaths + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines import Pipeline +from narps_open.data.task import TaskInformation +from narps_open.data.participants import get_group +from narps_open.core.common import list_intersection, elements_in_string, clean_list +from narps_open.core.interfaces import InterfaceFactory + +# Setup FSL +FSLCommand.set_default_output_type('NIFTI_GZ') + +class PipelineTeamT54A(Pipeline): + """ A class that defines the pipeline of team T54A """ + + def __init__(self): + super().__init__() + self.fwhm = 4.0 + self.team_id = 'T54A' + self.contrast_list = ['1', '2'] + self.run_level_contrasts = [ + ('gain', 'T', ['trial', 'gain', 'loss'], [0, 1, 0]), + ('loss', 'T', ['trial', 'gain', 'loss'], [0, 0, 1]) + ] + + def get_preprocessing(self): + """ No preprocessing has been done by team T54A """ + return None + + def get_subject_information(event_file): + """ + Create Bunchs for specifyModel. + + Parameters : + - event_file : str, file corresponding to the run and the subject to analyze + + Returns : + - subject_info : list of Bunch for 1st level analysis. + """ + from nipype.interfaces.base import Bunch + + condition_names = ['trial', 'gain', 'loss', 'difficulty', 'response', 'missed'] + onsets = {} + durations = {} + amplitudes = {} + + for condition in condition_names: + # Create dictionary items with empty lists + onsets.update({condition : []}) + durations.update({condition : []}) + amplitudes.update({condition : []}) + + with open(event_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() + if info[5] != 'NoResp': - if c == 'gain': - onset[c].append(float(info[0])) - duration[c].append(float(info[4])) - amplitude[c].append(float(info[2])) - elif c == 'loss': - onset[c].append(float(info[0])) - duration[c].append(float(info[4])) - amplitude[c].append(float(info[3])) - elif c == 'trial': - onset[c].append(float(info[0])) - duration[c].append(float(info[4])) - amplitude[c].append(float(1)) - elif c == 'difficulty': - onset[c].append(float(info[0])) - duration[c].append(float(info[4])) - amplitude[c].append(abs(0.5 * float(info[2]) - float(info[3]))) - elif c == 'response': - onset[c].append(float(info[0]) + float(info[4])) - duration[c].append(float(0)) - amplitude[c].append(float(1)) + onsets['trial'].append(float(info[0])) + durations['trial'].append(float(info[4])) + amplitudes['trial'].append(1.0) + onsets['gain'].append(float(info[0])) + durations['gain'].append(float(info[4])) + amplitudes['gain'].append(float(info[2])) + onsets['loss'].append(float(info[0])) + durations['loss'].append(float(info[4])) + amplitudes['loss'].append(float(info[3])) + onsets['difficulty'].append(float(info[0])) + durations['difficulty'].append(float(info[4])) + amplitudes['difficulty'].append( + abs(0.5 * float(info[2]) - float(info[3])) + ) + onsets['response'].append(float(info[0]) + float(info[4])) + durations['response'].append(0.0) + amplitudes['response'].append(1.0) else: - if c=='missed': - onset[c].append(float(info[0])) - duration[c].append(float(0)) - - #for c in ['gain', 'loss']: - # amplitude[c] = amplitude[c] - np.mean(amplitude[c]) - - - subject_info = [] - - subject_info.append(Bunch(conditions=cond_names, - onsets=[onset[k] for k in cond_names], - durations=[duration[k] for k in cond_names], - amplitudes=[amplitude[k] for k in cond_names], - regressor_names=None, - regressors=None)) - - return subject_info - -def get_parameters_file(file, subject_id, run_id, result_dir, working_dir): - ''' - Create new tsv files with only desired parameters per subject per run. - - Parameters : - - filepaths : paths to subject parameters file (i.e. one per run) - - subject_id : subject for whom the 1st level analysis is made - - run_id: run for which the 1st level analysis is made - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - Return : - - parameters_file : paths to new files containing only desired parameters. - ''' - import pandas as pd - import numpy as np - from os.path import join as opj - import os - - parameters_file = [] - - df = pd.read_csv(file, sep = '\t', header=0) - if 'NonSteadyStateOutlier00' in df.columns: - temp_list = np.array([df['X'], df['Y'], df['Z'], - df['RotX'], df['RotY'], df['RotZ'], - df['NonSteadyStateOutlier00']]) - else: - temp_list = np.array([df['X'], df['Y'], df['Z'], - df['RotX'], df['RotY'], df['RotZ']])# Parameters we want to use for the model - retained_parameters = pd.DataFrame(np.transpose(temp_list)) - new_path =opj(result_dir, working_dir, 'parameters_file', f"parameters_file_sub-{subject_id}_run{run_id}.tsv") - if not os.path.isdir(opj(result_dir, working_dir, 'parameters_file')): - os.mkdir(opj(result_dir, working_dir, 'parameters_file')) - writer = open(new_path, "w") - writer.write(retained_parameters.to_csv(sep = '\t', index = False, header = False, na_rep = '0.0')) - writer.close() - - parameters_file = new_path - os.system('export PATH=$PATH:/local/egermani/ICA-AROMA') - - return parameters_file - -# Linear contrast effects: 'Gain' vs. baseline, 'Loss' vs. baseline. -def get_contrasts(subject_id): - ''' - Create the list of tuples that represents contrasts. - Each contrast is in the form : - (Name,Stat,[list of condition names],[weights on those conditions]) - - Parameters: - - subject_id: str, ID of the subject - - Returns: - - contrasts: list of tuples, list of contrasts to analyze - ''' - # list of condition names - conditions = ['trial', 'gain', 'loss'] - - # create contrasts - gain = ('gain', 'T', conditions, [0, 1, 0]) - - loss = ('loss', 'T', conditions, [0, 0, 1]) - - # contrast list - contrasts = [gain, loss] - - return contrasts - - -def rm_smoothed_files(files, subject_id, run_id, result_dir, working_dir): - import shutil - from os.path import join as opj - - smooth_dir = opj(result_dir, working_dir, 'l1_analysis', f"_run_id_{run_id}_subject_id_{subject_id}", 'smooth') - - try: - shutil.rmtree(smooth_dir) - except OSError as e: - print(e) - else: - print("The directory is deleted successfully") - - -def get_l1_analysis(subject_list, run_list, TR, fwhm, exp_dir, output_dir, working_dir, result_dir): - """ - Returns the first level analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the analysis - - run_list: list of str, list of runs for which you want to do the analysis - - fwhm: float, fwhm for smoothing step - - TR: float, time repetition used during acquisition - - Returns: - - l1_analysis : Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource = Node(IdentityInterface(fields = ['subject_id', 'run_id']), name = 'infosource') - infosource.iterables = [('subject_id', subject_list), - ('run_id', run_list)] - - # Templates to select files node - func_file = opj('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_preproc.nii.gz') - - event_file = opj('sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv') - - param_file = opj('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold_confounds.tsv') - - template = {'func' : func_file, 'event' : event_file, 'param' : param_file} - - # SelectFiles node - to select necessary files - selectfiles = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles') - - # DataSink Node - store the wanted results in the wanted repository - datasink = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink') - - ## Skullstripping - skullstrip = Node(BET(frac = 0.1, functional = True, mask = True), name = 'skullstrip') - - ## Smoothing - smooth = Node(IsotropicSmooth(fwhm = 6), name = 'smooth') - - # Node contrasts to get contrasts - contrasts = Node(Function(function=get_contrasts, - input_names=['subject_id'], - output_names=['contrasts']), - name='contrasts') - - # Get Subject Info - get subject specific condition information - get_subject_infos = Node(Function(input_names=['event_file'], - output_names=['subject_info'], - function=get_session_infos), - name='get_subject_infos') - - specify_model = Node(SpecifyModel(high_pass_filter_cutoff = 100, - input_units = 'secs', - time_repetition = TR), name = 'specify_model') - - parameters = Node(Function(function=get_parameters_file, - input_names=['file', 'subject_id', 'run_id', 'result_dir', 'working_dir'], - output_names=['parameters_file']), - name='parameters') - - parameters.inputs.result_dir = result_dir - parameters.inputs.working_dir = working_dir - - # First temporal derivatives of the two regressors were also used, - # along with temporal filtering (60 s) of all the independent variable time-series. - # No motion parameter regressors used. - - l1_design = Node(Level1Design(bases = {'dgamma':{'derivs' : True}}, - interscan_interval = TR, - model_serial_correlations = True), name = 'l1_design') - - model_generation = Node(FEATModel(), name = 'model_generation') - - model_estimate = Node(FILMGLS(), name='model_estimate') - - remove_smooth = Node(Function(input_names = ['subject_id', 'run_id', 'files', 'result_dir', 'working_dir'], - function = rm_smoothed_files), name = 'remove_smooth') - - remove_smooth.inputs.result_dir = result_dir - remove_smooth.inputs.working_dir = working_dir - - # Create l1 analysis workflow and connect its nodes - l1_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = "l1_analysis") - - l1_analysis.connect([(infosource, selectfiles, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (selectfiles, get_subject_infos, [('event', 'event_file')]), - (selectfiles, parameters, [('param', 'file')]), - (infosource, contrasts, [('subject_id', 'subject_id')]), - (infosource, parameters, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (selectfiles, skullstrip, [('func', 'in_file')]), - (skullstrip, smooth, [('out_file', 'in_file')]), - (parameters, specify_model, [('parameters_file', 'realignment_parameters')]), - (smooth, specify_model, [('out_file', 'functional_runs')]), - (get_subject_infos, specify_model, [('subject_info', 'subject_info')]), - (contrasts, l1_design, [('contrasts', 'contrasts')]), - (specify_model, l1_design, [('session_info', 'session_info')]), - (l1_design, model_generation, [('ev_files', 'ev_files'), ('fsf_files', 'fsf_file')]), - (smooth, model_estimate, [('out_file', 'in_file')]), - (model_generation, model_estimate, [('con_file', 'tcon_file'), - ('design_file', 'design_file')]), - (infosource, remove_smooth, [('subject_id', 'subject_id'), ('run_id', 'run_id')]), - (model_estimate, remove_smooth, [('results_dir', 'files')]), - (model_estimate, datasink, [('results_dir', 'l1_analysis.@results')]), - (model_generation, datasink, [('design_file', 'l1_analysis.@design_file'), - ('design_image', 'l1_analysis.@design_img')]), - (skullstrip, datasink, [('mask_file', 'l1_analysis.@skullstriped')]) - ]) - - return l1_analysis - -def get_l2_analysis(subject_list, contrast_list, run_list, exp_dir, output_dir, working_dir, result_dir, data_dir): - """ - Returns the 2nd level of analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - contrast_list: list of str, list of contrasts to analyze - - run_list: list of str, list of runs for which you want to do the analysis - - - Returns: - - l2_analysis: Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource_2ndlevel = Node(IdentityInterface(fields=['subject_id', 'contrast_id']), name='infosource_2ndlevel') - infosource_2ndlevel.iterables = [('subject_id', subject_list), ('contrast_id', contrast_list)] - - # Templates to select files node - copes_file = opj(output_dir, 'l1_analysis', '_run_id_*_subject_id_{subject_id}', 'results', - 'cope{contrast_id}.nii.gz') - - varcopes_file = opj(output_dir, 'l1_analysis', '_run_id_*_subject_id_{subject_id}', 'results', - 'varcope{contrast_id}.nii.gz') - - mask_file = opj(data_dir, 'NARPS-T54A', 'hypo1_cope.nii.gz') - - template = {'cope' : copes_file, 'varcope' : varcopes_file, 'mask':mask_file} - - # SelectFiles node - to select necessary files - selectfiles_2ndlevel = Node(SelectFiles(template, base_directory=result_dir), name = 'selectfiles_2ndlevel') - - datasink_2ndlevel = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink_2ndlevel') - - # Generate design matrix - specify_model_2ndlevel = Node(L2Model(num_copes = len(run_list)), name='l2model_2ndlevel') - - # Merge copes and varcopes files for each subject - merge_copes_2ndlevel = Node(Merge(dimension='t'), name='merge_copes_2ndlevel') - - merge_varcopes_2ndlevel = Node(Merge(dimension='t'), name='merge_varcopes_2ndlevel') - - # Second level (single-subject, mean of all four scans) analyses: Fixed effects analysis. - flame = Node(FLAMEO(run_mode = 'flame1'), - name='flameo') - - l2_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = "l2_analysis") - - l2_analysis.connect([(infosource_2ndlevel, selectfiles_2ndlevel, [('subject_id', 'subject_id'), - ('contrast_id', 'contrast_id')]), - (selectfiles_2ndlevel, merge_copes_2ndlevel, [('cope', 'in_files')]), - (selectfiles_2ndlevel, merge_varcopes_2ndlevel, [('varcope', 'in_files')]), - (selectfiles_2ndlevel, flame, [('mask', 'mask_file')]), - (merge_copes_2ndlevel, flame, [('merged_file', 'cope_file')]), - (merge_varcopes_2ndlevel, flame, [('merged_file', 'var_cope_file')]), - (specify_model_2ndlevel, flame, [('design_mat', 'design_file'), - ('design_con', 't_con_file'), - ('design_grp', 'cov_split_file')]), - (flame, datasink_2ndlevel, [('zstats', 'l2_analysis.@stats'), - ('tstats', 'l2_analysis.@tstats'), - ('copes', 'l2_analysis.@copes'), - ('var_copes', 'l2_analysis.@varcopes')])]) - - return l2_analysis - -def get_subgroups_contrasts(copes, varcopes, subject_ids, participants_file): - ''' - Parameters : - - copes: original file list selected by selectfiles node - - varcopes: original file list selected by selectfiles node - - subject_ids: list of subject IDs that are analyzed - - participants_file: str, file containing participants characteristics - - This function return the file list containing only the files belonging to subject in the wanted group. - ''' - - from os.path import join as opj - - equalRange_id = [] - equalIndifference_id = [] - - subject_list = ['sub-' + str(i) for i in subject_ids] - - with open(participants_file, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - - if info[0] in subject_list and info[1] == "equalIndifference": - equalIndifference_id.append(info[0][-3:]) - elif info[0] in subject_list and info[1] == "equalRange": - equalRange_id.append(info[0][-3:]) - - copes_equalIndifference = [] - copes_equalRange = [] - copes_global = [] - varcopes_equalIndifference = [] - varcopes_equalRange = [] - varcopes_global = [] - - for file in copes: - sub_id = file.split('/') - if sub_id[-1][4:7] in equalIndifference_id: - copes_equalIndifference.append(file) - elif sub_id[-1][4:7] in equalRange_id: - copes_equalRange.append(file) - if sub_id[-1][4:7] in subject_ids: - copes_global.append(file) - - for file in varcopes: - sub_id = file.split('/') - if sub_id[-1][4:7] in equalIndifference_id: - varcopes_equalIndifference.append(file) - elif sub_id[-1][4:7] in equalRange_id: - varcopes_equalRange.append(file) - if sub_id[-1][4:7] in subject_ids: - varcopes_global.append(file) - - return copes_equalIndifference, copes_equalRange, varcopes_equalIndifference, varcopes_equalRange, equalIndifference_id, equalRange_id, copes_global, varcopes_global - -def get_regs(equalRange_id, equalIndifference_id, method, subject_list): - """ - Create dictionary of regressors for group analysis. - - Parameters: - - equalRange_id: list of str, ids of subjects in equal range group - - equalIndifference_id: list of str, ids of subjects in equal indifference group - - method: one of "equalRange", "equalIndifference" or "groupComp" - - subject_list: list of str, ids of subject for which to do the analysis - - Returns: - - regressors: dict, dictionary of regressors used to distinguish groups in FSL group analysis - """ - if method == "equalRange": - regressors = dict(group_mean = [1 for i in range(len(equalRange_id))]) - - elif method == "equalIndifference": - regressors = dict(group_mean = [1 for i in range(len(equalIndifference_id))]) - - elif method == "groupComp": - equalRange_reg = [1 for i in range(len(equalRange_id) + len(equalIndifference_id))] - equalIndifference_reg = [0 for i in range(len(equalRange_id) + len(equalIndifference_id))] - - for i, sub_id in enumerate(subject_list): - if sub_id in equalIndifference_id: - index = i - equalIndifference_reg[index] = 1 - equalRange_reg[index] = 0 - - regressors = dict(equalRange = equalRange_reg, - equalIndifference = equalIndifference_reg) - - return regressors - -def get_group_workflow(subject_list, n_sub, contrast_list, method, exp_dir, output_dir, - working_dir, result_dir, data_dir): - """ - Returns the group level of analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - contrast_list: list of str, list of contrasts to analyze - - method: one of "equalRange", "equalIndifference" or "groupComp" - - n_sub: int, number of subjects - - Returns: - - l2_analysis: Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource_3rdlevel = Node(IdentityInterface(fields = ['contrast_id', 'exp_dir', 'result_dir', - 'output_dir', 'working_dir', 'subject_list', 'method'], - exp_dir = exp_dir, result_dir = result_dir, - output_dir = output_dir, working_dir = working_dir, - subject_list = subject_list, method = method), - name = 'infosource_3rdlevel') - infosource_3rdlevel.iterables = [('contrast_id', contrast_list)] - - # Templates to select files node - copes_file = opj(data_dir, 'NARPS-T54A', 'sub-*_contrast-{contrast_id}_cope.nii.gz') # contrast_id = ploss ou pgain - - varcopes_file = opj(data_dir, 'NARPS-T54A', 'sub-*_contrast-{contrast_id}_varcope.nii.gz') - - participants_file = opj(exp_dir, 'participants.tsv') - - mask_file = opj(data_dir, 'NARPS-T54A', 'hypo2_unthresh_Z.nii.gz') - - template = {'cope' : copes_file, 'varcope' : varcopes_file, 'participants' : participants_file, - 'mask' : mask_file} - - # SelectFiles node - to select necessary files - selectfiles_3rdlevel = Node(SelectFiles(template, base_directory=result_dir), name = 'selectfiles_3rdlevel') - - datasink_3rdlevel = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink_3rdlevel') - - merge_copes_3rdlevel = Node(Merge(dimension = 't'), name = 'merge_copes_3rdlevel') - merge_varcopes_3rdlevel = Node(Merge(dimension = 't'), name = 'merge_varcopes_3rdlevel') - - subgroups_contrasts = Node(Function(input_names = ['copes', 'varcopes', 'subject_ids', 'participants_file'], - output_names = ['copes_equalIndifference', 'copes_equalRange', - 'varcopes_equalIndifference', 'varcopes_equalRange', - 'equalIndifference_id', 'equalRange_id', 'copes_global', - 'varcopes_global'], - function = get_subgroups_contrasts), - name = 'subgroups_contrasts') - - specifymodel_3rdlevel = Node(MultipleRegressDesign(), name = 'specifymodel_3rdlevel') - - flame_3rdlevel = Node(FLAMEO(run_mode = 'flame1'), - name='flame_3rdlevel') - - regs = Node(Function(input_names = ['equalRange_id', 'equalIndifference_id', 'method', 'subject_list'], - output_names = ['regressors'], - function = get_regs), name = 'regs') - regs.inputs.method = method - regs.inputs.subject_list = subject_list - - randomise = Node(Randomise(num_perm = 10000, tfce=True, vox_p_values=True, c_thresh=0.05, tfce_E=0.01), - name = "randomise") - - l3_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = f"l3_analysis_{method}_nsub_{n_sub}") - - l3_analysis.connect([(infosource_3rdlevel, selectfiles_3rdlevel, [('contrast_id', 'contrast_id')]), - (infosource_3rdlevel, subgroups_contrasts, [('subject_list', 'subject_ids')]), - (selectfiles_3rdlevel, subgroups_contrasts, [('cope', 'copes'), ('varcope', 'varcopes'), - ('participants', 'participants_file')]), - (selectfiles_3rdlevel, flame_3rdlevel, [('mask', 'mask_file')]), - (selectfiles_3rdlevel, randomise, [('mask', 'mask')]), - (subgroups_contrasts, regs, [('equalRange_id', 'equalRange_id'), - ('equalIndifference_id', 'equalIndifference_id')]), - (regs, specifymodel_3rdlevel, [('regressors', 'regressors')])]) - - - if method == 'equalIndifference' or method == 'equalRange': - specifymodel_3rdlevel.inputs.contrasts = [["group_mean", "T", ["group_mean"], [1]], ["group_mean_neg", "T", ["group_mean"], [-1]]] - - if method == 'equalIndifference': - l3_analysis.connect([(subgroups_contrasts, merge_copes_3rdlevel, - [('copes_equalIndifference', 'in_files')]), - (subgroups_contrasts, merge_varcopes_3rdlevel, - [('varcopes_equalIndifference', 'in_files')])]) - elif method == 'equalRange': - l3_analysis.connect([(subgroups_contrasts, merge_copes_3rdlevel, [('copes_equalRange', 'in_files')]), - (subgroups_contrasts, merge_varcopes_3rdlevel, [('varcopes_equalRange', 'in_files')])]) - - elif method == "groupComp": - specifymodel_3rdlevel.inputs.contrasts = [["equalRange_sup", "T", ["equalRange", "equalIndifference"], - [1, -1]]] - - l3_analysis.connect([(subgroups_contrasts, merge_copes_3rdlevel, [('copes_global', 'in_files')]), - (subgroups_contrasts, merge_varcopes_3rdlevel, [('varcopes_global', 'in_files')])]) - - l3_analysis.connect([(merge_copes_3rdlevel, flame_3rdlevel, [('merged_file', 'cope_file')]), - (merge_varcopes_3rdlevel, flame_3rdlevel, [('merged_file', 'var_cope_file')]), - (specifymodel_3rdlevel, flame_3rdlevel, [('design_mat', 'design_file'), - ('design_con', 't_con_file'), - ('design_grp', 'cov_split_file')]), - (merge_copes_3rdlevel, randomise, [('merged_file', 'in_file')]), - (specifymodel_3rdlevel, randomise, [('design_mat', 'design_mat'), - ('design_con', 'tcon')]), - (randomise, datasink_3rdlevel, [('t_corrected_p_files', - f"l3_analysis_{method}_nsub_{n_sub}.@tcorpfile"), - ('tstat_files', f"l3_analysis_{method}_nsub_{n_sub}.@tstat")]), - (flame_3rdlevel, datasink_3rdlevel, [('zstats', - f"l3_analysis_{method}_nsub_{n_sub}.@zstats"), - ('tstats', - f"l3_analysis_{method}_nsub_{n_sub}.@tstats")]), + onsets['missed'].append(float(info[0])) + durations['missed'].append(0.0) + amplitudes['missed'].append(1.0) + + # Check if there where missed trials for this run + if not onsets['missed']: + condition_names.remove('missed') + + return [ + Bunch( + conditions = condition_names, + onsets = [onsets[k] for k in condition_names], + durations = [durations[k] for k in condition_names], + amplitudes = [amplitudes[k] for k in condition_names], + regressor_names = None, + regressors = None) + ] + + def get_parameters_file(filepath, subject_id, run_id, working_dir): + """ + Create a tsv file with only desired parameters per subject per run. + + Parameters : + - filepath : path to the subject parameters file (i.e. one per run) + - subject_id : subject for whom the 1st level analysis is made + - run_id: run for which the 1st level analysis is made + - working_dir: str, name of the directory for intermediate results + + Return : + - parameters_file : paths to new files containing only desired parameters. + """ + from os import makedirs + from os.path import join + + from pandas import read_csv, DataFrame + from numpy import array, transpose + + data_frame = read_csv(filepath, sep = '\t', header=0) + if 'NonSteadyStateOutlier00' in data_frame.columns: + temp_list = array([ + data_frame['X'], data_frame['Y'], data_frame['Z'], + data_frame['RotX'], data_frame['RotY'], data_frame['RotZ'], + data_frame['NonSteadyStateOutlier00']]) + else: + temp_list = array([ + data_frame['X'], data_frame['Y'], data_frame['Z'], + data_frame['RotX'], data_frame['RotY'], data_frame['RotZ']]) + retained_parameters = DataFrame(transpose(temp_list)) + + parameters_file = join(working_dir, 'parameters_file', + f'parameters_file_sub-{subject_id}_run-{run_id}.tsv') + + makedirs(join(working_dir, 'parameters_file'), exist_ok = True) + + with open(parameters_file, 'w') as writer: + writer.write(retained_parameters.to_csv( + sep = '\t', index = False, header = False, na_rep = '0.0')) + + return parameters_file + + def get_run_level_analysis(self): + """ + Create the run level analysis workflow. + + Returns: + - run_level_analysis : nipype.WorkFlow + """ + # IdentityInterface Node - To iterate on subject and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'run_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('run_id', self.run_list) + ] + + # SelectFiles - to select necessary files + template = { + # Parameter file + 'param' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_confounds.tsv'), + # Functional MRI + 'func' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_preproc.nii.gz' + ), + # Event file + 'event' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv') + } + select_files = Node(SelectFiles(template), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # BET Node - Skullstripping data + skull_stripping_func = Node(BET(), name = 'skull_stripping_func') + skull_stripping_func.inputs.frac = 0.3 + skull_stripping_func.inputs.functional = True + skull_stripping_func.inputs.mask = True + + # IsotropicSmooth Node - Smoothing data + smoothing_func = Node(IsotropicSmooth(), name = 'smoothing_func') + smoothing_func.inputs.fwhm = self.fwhm # TODO : Previously set to 6 mm ? + + # Function Node get_subject_infos - Get subject specific condition information + subject_information = Node(Function( + function = self.get_subject_information, + input_names = ['event_file'], + output_names = ['subject_info'] + ), name = 'subject_information') + + # SpecifyModel Node - Generate run level model + specify_model = Node(SpecifyModel(), name = 'specify_model') + specify_model.inputs.high_pass_filter_cutoff = 100 + specify_model.inputs.input_units = 'secs' + specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] + + # Function Node get_parameters_file - Get files with movement parameters + parameters = Node(Function( + function = self.get_parameters_file, + input_names = ['filepath', 'subject_id', 'run_id', 'working_dir'], + output_names = ['parameters_file']), + name = 'parameters') + parameters.inputs.working_dir = self.directories.working_dir + + # Level1Design Node - Generate files for run level computation + model_design = Node(Level1Design(), name = 'model_design') + model_design.inputs.bases = {'dgamma':{'derivs' : True}} + model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + model_design.inputs.model_serial_correlations = True + model_design.inputs.contrasts = self.run_level_contrasts + + # FEATModel Node - Generate run level model + model_generation = Node(FEATModel(), name = 'model_generation') + + # FILMGLS Node - Estimate first level model + model_estimate = Node(FILMGLS(), name = 'model_estimate') + + # Create l1 analysis workflow and connect its nodes + run_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'run_level_analysis' + ) + run_level_analysis.connect([ + (information_source, select_files, [ + ('subject_id', 'subject_id'), + ('run_id', 'run_id')]), + (select_files, subject_information, [('event', 'event_file')]), + (select_files, parameters, [('param', 'filepath')]), + (information_source, parameters, [ + ('subject_id', 'subject_id'), + ('run_id', 'run_id')]), + (select_files, skull_stripping_func, [('func', 'in_file')]), + (skull_stripping_func, smoothing_func, [('out_file', 'in_file')]), + (parameters, specify_model, [('parameters_file', 'realignment_parameters')]), + (smoothing_func, specify_model, [('out_file', 'functional_runs')]), + (subject_information, specify_model, [('subject_info', 'subject_info')]), + (specify_model, model_design, [('session_info', 'session_info')]), + (model_design, model_generation, [ + ('ev_files', 'ev_files'), + ('fsf_files', 'fsf_file')]), + (smoothing_func, model_estimate, [('out_file', 'in_file')]), + (model_generation, model_estimate, [ + ('con_file', 'tcon_file'), + ('design_file', 'design_file')]), + (model_estimate, data_sink, [('results_dir', 'run_level_analysis.@results')]), + (model_generation, data_sink, [ + ('design_file', 'run_level_analysis.@design_file'), + ('design_image', 'run_level_analysis.@design_img')]), + (skull_stripping_func, data_sink, [('mask_file', 'run_level_analysis.@skullstriped')]) + ]) + + # Remove large files, if requested + if Configuration()['pipelines']['remove_unused_data']: + + # Remove Node - Remove skullstriped func files once they are no longer needed + remove_skullstrip = Node( + InterfaceFactory.create('remove_parent_directory'), + name = 'remove_skullstrip') + + # Remove Node - Remove smoothed files once they are no longer needed + remove_smooth = Node( + InterfaceFactory.create('remove_parent_directory'), + name = 'remove_smooth') + + # Add connections + run_level_analysis.connect([ + (data_sink, remove_skullstrip, [('out_file', '_')]), + (skull_stripping_func, remove_skullstrip, [('out_file', 'file_name')]), + (model_estimate, remove_smooth, [('results_dir', '_')]), + (smoothing_func, remove_smooth, [('out_file', 'file_name')]) + ]) + + return run_level_analysis + + def get_run_level_outputs(self): + """ Return the names of the files the run level analysis is supposed to generate. """ + + return_list = [] + output_dir = join(self.directories.output_dir, 'run_level_analysis', + '_run_id_{run_id}_subject_id_{subject_id}') + + # Handle results dir + parameters = { + 'run_id' : self.run_list, + 'subject_id' : self.subject_list, + 'contrast_id' : self.contrast_list + } + parameter_sets = product(*parameters.values()) + templates = [ + join(output_dir, 'results', 'cope{contrast_id}.nii.gz'), + join(output_dir, 'results', 'tstat{contrast_id}.nii.gz'), + join(output_dir, 'results', 'varcope{contrast_id}.nii.gz'), + join(output_dir, 'results', 'zstat{contrast_id}.nii.gz') + ] + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets for template in templates] + + # Handle mask file + parameters = { + 'run_id' : self.run_list, + 'subject_id' : self.subject_list, + } + parameter_sets = product(*parameters.values()) + template = join(output_dir, 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_preproc_brain_mask.nii.gz') + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_subject_level_analysis(self): + """ + Create the subject level analysis workflow. + + Returns: + - subject_level_analysis : nipype.WorkFlow + """ + # Infosource Node - To iterate on subject and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'contrast_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('contrast_id', self.contrast_list) + ] + + # Templates to select files node + templates = { + 'cope' : join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', 'results', + 'cope{contrast_id}.nii.gz'), + 'varcope' : join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', 'results', + 'varcope{contrast_id}.nii.gz'), + 'masks': join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_preproc_brain_mask.nii.gz') + } + + # SelectFiles Node - to select necessary files + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.results_dir + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # L2Model Node - Generate subject specific second level model + generate_model = Node(L2Model(), name = 'generate_model') + generate_model.inputs.num_copes = len(self.run_list) + + # Merge Node - Merge copes files for each subject + merge_copes = Node(Merge(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + + # Merge Node - Merge varcopes files for each subject + merge_varcopes = Node(Merge(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.run_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.run_list) - 1) + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'flame1' + + # Second level (single-subject, mean of all four scans) analyses: Fixed effects analysis. + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis') + subject_level_analysis.connect([ + (information_source, select_files, [ + ('subject_id', 'subject_id'), + ('contrast_id', 'contrast_id')]), + (select_files, merge_copes, [('cope', 'in_files')]), + (select_files, merge_varcopes, [('varcope', 'in_files')]), + (select_files, split_masks, [('masks', 'inlist')]), + (split_masks, mask_intersection, [('out1', 'in_file')]), + (split_masks, mask_intersection, [('out2', 'operand_files')]), + (mask_intersection, estimate_model, [('out_file', 'mask_file')]), + (merge_copes, estimate_model, [('merged_file', 'cope_file')]), + (merge_varcopes, estimate_model, [('merged_file', 'var_cope_file')]), + (generate_model, estimate_model, [ + ('design_mat', 'design_file'), + ('design_con', 't_con_file'), + ('design_grp', 'cov_split_file')]), + (mask_intersection, data_sink, [('out_file', 'subject_level_analysis.@mask')]), + (estimate_model, data_sink, [ + ('zstats', 'subject_level_analysis.@stats'), + ('tstats', 'subject_level_analysis.@tstats'), + ('copes', 'subject_level_analysis.@copes'), + ('var_copes', 'subject_level_analysis.@varcopes')])]) + + return subject_level_analysis + + def get_subject_level_outputs(self): + """ Return the names of the files the subject level analysis is supposed to generate. """ + + parameters = { + 'contrast_id' : self.contrast_list, + 'subject_id' : self.subject_list, + } + parameter_sets = product(*parameters.values()) + output_dir = join(self.directories.output_dir, 'subject_level_analysis', + '_contrast_id_{contrast_id}_subject_id_{subject_id}') + templates = [ + join(output_dir, 'cope1.nii.gz'), + join(output_dir, 'tstat1.nii.gz'), + join(output_dir, 'varcope1.nii.gz'), + join(output_dir, 'zstat1.nii.gz'), + join(output_dir, 'sub-{subject_id}_task-MGT_run-01_bold_space-MNI152NLin2009cAsym_preproc_brain_mask_maths.nii.gz') + ] + + return [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets for template in templates] + + def get_one_sample_t_test_regressors(subject_list: list) -> dict: + """ + Create dictionary of regressors for one sample t-test group analysis. + + Parameters: + - subject_list: ids of subject in the group for which to do the analysis + + Returns: + - dict containing named lists of regressors. + """ + + return dict(group_mean = [1 for _ in subject_list]) + + def get_two_sample_t_test_regressors( + equal_range_ids: list, + equal_indifference_ids: list, + subject_list: list, + ) -> dict: + """ + Create dictionary of regressors for two sample t-test group analysis. + + Parameters: + - equal_range_ids: ids of subjects in equal range group + - equal_indifference_ids: ids of subjects in equal indifference group + - subject_list: ids of subject for which to do the analysis + + Returns: + - regressors, dict: containing named lists of regressors. + - groups, list: group identifiers to distinguish groups in FSL analysis. + """ + + # Create 2 lists containing n_sub values which are + # * 1 if the participant is on the group + # * 0 otherwise + equal_range_regressors = [1 if i in equal_range_ids else 0 for i in subject_list] + equal_indifference_regressors = [ + 1 if i in equal_indifference_ids else 0 for i in subject_list + ] + + # Create regressors output : a dict with the two list + regressors = dict( + equalRange = equal_range_regressors, + equalIndifference = equal_indifference_regressors + ) + + # Create groups outputs : a list with 1 for equalRange subjects and 2 for equalIndifference + groups = [1 if i == 1 else 2 for i in equal_range_regressors] + + return regressors, groups + + def get_group_level_analysis(self): + """ + Return all workflows for the group level analysis. + + Returns; + - a list of nipype.WorkFlow + """ + + methods = ['equalRange', 'equalIndifference', 'groupComp'] + return [self.get_group_level_analysis_sub_workflow(method) for method in methods] + + def get_group_level_analysis_sub_workflow(self, method): + """ + Return a workflow for the group level analysis. + + Parameters: + - method: one of 'equalRange', 'equalIndifference' or 'groupComp' + + Returns: + - group_level_analysis: nipype.WorkFlow + """ + # Infosource Node - iterate over the contrasts generated by the subject level analysis + information_source = Node(IdentityInterface( + fields = ['contrast_id']), + name = 'information_source') + information_source.iterables = [('contrast_id', self.contrast_list)] + + # SelectFiles Node - select necessary files + templates = { + 'cope' : join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_*', + 'cope1.nii.gz'), + 'varcope' : join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_*', + 'varcope1.nii.gz'), + 'masks': join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_1_subject_id_*', + 'sub-*_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_preproc_brain_mask_maths.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.results_dir + + # Datasink Node - save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + + # Function Node elements_in_string + # Get contrast of parameter estimates (cope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_copes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_copes', iterfield = 'input_str' + ) + + # Function Node elements_in_string + # Get variance of the estimated copes (varcope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_varcopes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_varcopes', iterfield = 'input_str' + ) + + # Merge Node - Merge cope files + merge_copes = Node(Merge(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + + # Merge Node - Merge cope files + merge_varcopes = Node(Merge(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.subject_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.subject_list) - 1) + + # MultipleRegressDesign Node - Specify model + specify_model = Node(MultipleRegressDesign(), name = 'specify_model') + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'flame1' + + # Randomise Node - + randomise = Node(Randomise(), name = 'randomise') + randomise.inputs.num_perm = 10000 + randomise.inputs.tfce = True + randomise.inputs.vox_p_values = True + randomise.inputs.c_thresh = 0.05 + randomise.inputs.tfce_E = 0.01 + + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Declare the workflow + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + group_level_analysis.connect([ + (information_source, select_files, [('contrast_id', 'contrast_id')]), + (select_files, get_copes, [('cope', 'input_str')]), + (select_files, get_varcopes, [('varcope', 'input_str')]), + (get_copes, merge_copes, [(('out_list', clean_list), 'in_files')]), + (get_varcopes, merge_varcopes,[(('out_list', clean_list), 'in_files')]), + (select_files, split_masks, [('masks', 'inlist')]), + (split_masks, mask_intersection, [('out1', 'in_file')]), + (split_masks, mask_intersection, [('out2', 'operand_files')]), + (mask_intersection, estimate_model, [('out_file', 'mask_file')]), + (mask_intersection, randomise, [('out_file', 'mask')]), + (merge_copes, estimate_model, [('merged_file', 'cope_file')]), + (merge_varcopes, estimate_model, [('merged_file', 'var_cope_file')]), + (specify_model, estimate_model, [ + ('design_mat', 'design_file'), + ('design_con', 't_con_file'), + ('design_grp', 'cov_split_file') + ]), + (merge_copes, randomise, [('merged_file', 'in_file')]), + (specify_model, randomise, [ + ('design_mat', 'design_mat'), + ('design_con', 'tcon') + ]), + (randomise, data_sink, [ + ('t_corrected_p_files', f'group_level_analysis_{method}_nsub_{nb_subjects}.@tcorpfile'), + ('tstat_files', f'group_level_analysis_{method}_nsub_{nb_subjects}.@tstat') + ]), + (estimate_model, data_sink, [ + ('zstats', f'group_level_analysis_{method}_nsub_{nb_subjects}.@zstats'), + ('tstats', f'group_level_analysis_{method}_nsub_{nb_subjects}.@tstats') + ]) + ]) + + if method in ('equalIndifference', 'equalRange'): + + # Setup a one sample t-test + specify_model.inputs.contrasts = [ + ['group_mean', 'T', ['group_mean'], [1]], + ['group_mean_neg', 'T', ['group_mean'], [-1]] + ] + + # Function Node get_group_subjects - Get subjects in the group and in the subject_list + get_group_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_group_subjects' + ) + get_group_subjects.inputs.list_1 = get_group(method) + get_group_subjects.inputs.list_2 = self.subject_list + + # Function Node get_one_sample_t_test_regressors + # Get regressors in the equalRange and equalIndifference method case + regressors_one_sample = Node( + Function( + function = self.get_one_sample_t_test_regressors, + input_names = ['subject_list'], + output_names = ['regressors'] + ), + name = 'regressors_one_sample', + ) + + # Add missing connections + group_level_analysis.connect([ + (get_group_subjects, get_copes, [('out_list', 'elements')]), + (get_group_subjects, get_varcopes, [('out_list', 'elements')]), + (get_group_subjects, regressors_one_sample, [('out_list', 'subject_list')]), + (regressors_one_sample, specify_model, [('regressors', 'regressors')]) ]) - - return l3_analysis - -def reorganize_results(result_dir, output_dir, n_sub, team_ID): - """ - Reorganize the results to analyze them. - - Parameters: - - result_dir: str, directory where results will be stored - - output_dir: str, name of the sub-directory for final results - - n_sub: float, number of subject used for the analysis - - team_ID: str, ID of the team to reorganize results - - """ - from os.path import join as opj - import os - import shutil - import gzip - import nibabel as nib - import numpy as np - - h1 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_pgain') - h2 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_pgain') - h3 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_pgain') - h4 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_pgain') - h5 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_ploss') - h6 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_ploss') - h7 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_ploss') - h8 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_ploss') - h9 = opj(result_dir, output_dir, f"l3_analysis_groupComp_nsub_{n_sub}", '_contrast_id_ploss') - - h = [h1, h2, h3, h4, h5, h6, h7, h8, h9] - - repro_unthresh = [opj(filename, "zstat2.nii.gz") if i in [4, 5] else opj(filename, "zstat1.nii.gz") for i, filename in enumerate(h)] - - repro_thresh = [opj(filename, "randomise_tfce_corrp_tstat2.nii.gz") if i in [4, 5] else opj(filename, 'randomise_tfce_corrp_tstat1.nii.gz') for i, filename in enumerate(h)] - - if not os.path.isdir(opj(result_dir, "NARPS-reproduction")): - os.mkdir(opj(result_dir, "NARPS-reproduction")) - - for i, filename in enumerate(repro_unthresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_unthresholded.nii.gz") - shutil.copyfile(f_in, f_out) - - for i, filename in enumerate(repro_thresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_thresholded.nii.gz") - shutil.copyfile(f_in, f_out) - - for i, filename in enumerate(repro_thresh): - f_in = filename - img = nib.load(filename) - original_affine = img.affine.copy() - spm = nib.load(repro_unthresh[i]) - new_img = img.get_fdata().astype(float) > 0 - new_img = new_img.astype(float) - print(np.unique(new_img)) - print(np.unique(spm.get_fdata())) - new_img = new_img * spm.get_fdata() - print(np.unique(new_img)) - new_spm = nib.Nifti1Image(new_img, original_affine) - nib.save(new_spm, opj(result_dir, "NARPS-reproduction", - f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_thresholded.nii.gz")) - - print(f"Results files of team {team_ID} reorganized.") + + elif method == 'groupComp': + + # Select copes and varcopes corresponding to the selected subjects + # Indeed the SelectFiles node asks for all (*) subjects available + get_copes.inputs.elements = self.subject_list + get_varcopes.inputs.elements = self.subject_list + + # Setup a two sample t-test + specify_model.inputs.contrasts = [ + ['equalRange_sup', 'T', ['equalRange', 'equalIndifference'], [1, -1]] + ] + + # Function Node get_equal_range_subjects + # Get subjects in the equalRange group and in the subject_list + get_equal_range_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_range_subjects' + ) + get_equal_range_subjects.inputs.list_1 = get_group('equalRange') + get_equal_range_subjects.inputs.list_2 = self.subject_list + + # Function Node get_equal_indifference_subjects + # Get subjects in the equalIndifference group and in the subject_list + get_equal_indifference_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_indifference_subjects' + ) + get_equal_indifference_subjects.inputs.list_1 = get_group('equalIndifference') + get_equal_indifference_subjects.inputs.list_2 = self.subject_list + + # Function Node get_two_sample_t_test_regressors + # Get regressors in the groupComp method case + regressors_two_sample = Node( + Function( + function = self.get_two_sample_t_test_regressors, + input_names = [ + 'equal_range_ids', + 'equal_indifference_ids', + 'subject_list', + ], + output_names = ['regressors', 'groups'] + ), + name = 'regressors_two_sample', + ) + regressors_two_sample.inputs.subject_list = self.subject_list + + # Add missing connections + group_level_analysis.connect([ + (get_equal_range_subjects, regressors_two_sample, [ + ('out_list', 'equal_range_ids') + ]), + (get_equal_indifference_subjects, regressors_two_sample, [ + ('out_list', 'equal_indifference_ids') + ]), + (regressors_two_sample, specify_model, [ + ('regressors', 'regressors'), + ('groups', 'groups')]) + ]) + + return group_level_analysis + + def get_group_level_outputs(self): + """ Return all names for the files the group level analysis is supposed to generate. """ + + # Handle equalRange and equalIndifference + parameters = { + 'contrast_id': self.contrast_list, + 'method': ['equalRange', 'equalIndifference'], + 'file': [ + 'randomise_tfce_corrp_tstat1.nii.gz', + 'randomise_tfce_corrp_tstat2.nii.gz', + 'randomise_tstat1.nii.gz', + 'randomise_tstat2.nii.gz', + 'tstat1.nii.gz', + 'tstat2.nii.gz', + 'zstat1.nii.gz', + 'zstat2.nii.gz' + ], + 'nb_subjects' : [str(len(self.subject_list))] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_{nb_subjects}', + '_contrast_id_{contrast_id}', + '{file}' + ) + + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + # Handle groupComp + files = [ + 'randomise_tfce_corrp_tstat1.nii.gz', + 'randomise_tstat1.nii.gz', + 'zstat1.nii.gz', + 'tstat1.nii.gz' + ] + + return_list += [join( + self.directories.output_dir, + f'group_level_analysis_groupComp_nsub_{len(self.subject_list)}', + '_contrast_id_2', f'{file}') for file in files] + + return return_list + + def get_hypotheses_outputs(self): + """ Return all hypotheses output file names. """ + + nb_sub = len(self.subject_list) + files = [ + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat2.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat2.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat2.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat2.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1.nii.gz'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz') + ] + return [join(self.directories.output_dir, f) for f in files] diff --git a/setup.py b/setup.py index 91a2d63a..185c8418 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ ] extras_require = { 'tests': [ + 'pathvalidate', 'pylint', 'pytest', 'pytest-cov', diff --git a/tests/conftest.py b/tests/conftest.py index e01e4a00..d46df024 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,9 @@ from shutil import rmtree from pytest import helpers +from pathvalidate import is_valid_filepath +from narps_open.pipelines import Pipeline from narps_open.runner import PipelineRunner from narps_open.utils import get_subject_id from narps_open.utils.correlation import get_correlation_coefficient @@ -21,6 +23,30 @@ # Init configuration, to ensure it is in testing mode Configuration(config_type='testing') +@helpers.register +def test_pipeline_outputs(pipeline: Pipeline, number_of_outputs: list): + """ Test the outputs of a Pipeline. + Arguments: + - pipeline, Pipeline: the pipeline to test + - number_of_outputs, list: a list containing the expected number of outputs for each + stage of the pipeline (preprocessing, run_level, subject_level, group_level, hypotheses) + + Return: True if the outputs are in sufficient number and each ones name is valid, + False otherwise. + """ + assert len(number_of_outputs) == 5 + for outputs, number in zip([ + pipeline.get_preprocessing_outputs(), + pipeline.get_run_level_outputs(), + pipeline.get_subject_level_outputs(), + pipeline.get_group_level_outputs(), + pipeline.get_hypotheses_outputs()], number_of_outputs): + + assert len(outputs) == number + for output in outputs: + assert is_valid_filepath(output, platform = 'auto') + assert not any(c in output for c in ['{', '}']) + @helpers.register def test_pipeline_execution( team_id: str, diff --git a/tests/pipelines/test_team_T54A.py b/tests/pipelines/test_team_T54A.py new file mode 100644 index 00000000..05ecc003 --- /dev/null +++ b/tests/pipelines/test_team_T54A.py @@ -0,0 +1,208 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_T54A' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_T54A.py + pytest -q test_team_T54A.py -k +""" +from os import mkdir +from os.path import exists, join +from shutil import rmtree + +from pytest import helpers, mark, fixture +from numpy import isclose +from nipype import Workflow +from nipype.interfaces.base import Bunch + +from narps_open.pipelines.team_T54A import PipelineTeamT54A +from narps_open.utils.configuration import Configuration + +TEMPORARY_DIR = join(Configuration()['directories']['test_runs'], 'test_T54A') + +@fixture +def remove_test_dir(): + """ A fixture to remove temporary directory created by tests """ + + rmtree(TEMPORARY_DIR, ignore_errors = True) + mkdir(TEMPORARY_DIR) + yield # test runs here + rmtree(TEMPORARY_DIR, ignore_errors = True) + +def compare_float_2d_arrays(array_1, array_2): + """ Assert array_1 and array_2 are close enough """ + + assert len(array_1) == len(array_2) + for reference_array, test_array in zip(array_1, array_2): + assert len(reference_array) == len(test_array) + assert isclose(reference_array, test_array).all() + +class TestPipelinesTeamT54A: + """ A class that contains all the unit tests for the PipelineTeamT54A class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeamT54A object """ + + pipeline = PipelineTeamT54A() + + # 1 - check the parameters + assert pipeline.fwhm == 4.0 + assert pipeline.team_id == 'T54A' + assert pipeline.contrast_list == ['1', '2'] + assert pipeline.run_level_contrasts == [ + ('gain', 'T', ['trial', 'gain', 'loss'], [0, 1, 0]), + ('loss', 'T', ['trial', 'gain', 'loss'], [0, 0, 1]) + ] + + # 2 - check workflows + assert pipeline.get_preprocessing() is None + assert isinstance(pipeline.get_run_level_analysis(), Workflow) + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + group_level = pipeline.get_group_level_analysis() + + assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeamT54A object """ + pipeline = PipelineTeamT54A() + # 1 - 1 subject outputs + pipeline.subject_list = ['001'] + helpers.test_pipeline_outputs(pipeline, [0, 9*4*1, 5*2*1, 8*2*2 + 4, 18]) + + # 2 - 4 subjects outputs + pipeline.subject_list = ['001', '002', '003', '004'] + helpers.test_pipeline_outputs(pipeline, [0, 9*4*4, 5*2*4, 8*2*2 + 4, 18]) + + @staticmethod + @mark.unit_test + def test_subject_information(): + """ Test the get_subject_information method """ + + # Get test files + test_file = join(Configuration()['directories']['test_data'], 'pipelines', 'events.tsv') + test_file_2 = join(Configuration()['directories']['test_data'], + 'pipelines', 'events_resp.tsv') + + # Prepare several scenarii + info_missed = PipelineTeamT54A.get_subject_information(test_file) + info_ok = PipelineTeamT54A.get_subject_information(test_file_2) + + # Compare bunches to expected + bunch = info_missed[0] + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['trial', 'gain', 'loss', 'difficulty', 'response', 'missed'] + compare_float_2d_arrays(bunch.onsets, [ + [4.071, 11.834, 27.535, 36.435], + [4.071, 11.834, 27.535, 36.435], + [4.071, 11.834, 27.535, 36.435], + [4.071, 11.834, 27.535, 36.435], + [6.459, 14.123, 29.615, 38.723], + [19.535] + ]) + compare_float_2d_arrays(bunch.durations, [ + [2.388, 2.289, 2.08, 2.288], + [2.388, 2.289, 2.08, 2.288], + [2.388, 2.289, 2.08, 2.288], + [2.388, 2.289, 2.08, 2.288], + [0.0, 0.0, 0.0, 0.0], + [0.0] + ]) + compare_float_2d_arrays(bunch.amplitudes, [ + [1.0, 1.0, 1.0, 1.0], + [14.0, 34.0, 10.0, 16.0], + [6.0, 14.0, 15.0, 17.0], + [1.0, 3.0, 10.0, 9.0], + [1.0, 1.0, 1.0, 1.0], + [1.0] + ]) + assert bunch.regressor_names == None + assert bunch.regressors == None + + bunch = info_ok[0] + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['trial', 'gain', 'loss', 'difficulty', 'response'] + compare_float_2d_arrays(bunch.onsets, [ + [4.071, 11.834, 27.535, 36.435], + [4.071, 11.834, 27.535, 36.435], + [4.071, 11.834, 27.535, 36.435], + [4.071, 11.834, 27.535, 36.435], + [6.459, 14.123, 29.615, 38.723] + ]) + compare_float_2d_arrays(bunch.durations, [ + [2.388, 2.289, 2.08, 2.288], + [2.388, 2.289, 2.08, 2.288], + [2.388, 2.289, 2.08, 2.288], + [2.388, 2.289, 2.08, 2.288], + [0.0, 0.0, 0.0, 0.0] + ]) + compare_float_2d_arrays(bunch.amplitudes, [ + [1.0, 1.0, 1.0, 1.0], + [14.0, 34.0, 10.0, 16.0], + [6.0, 14.0, 15.0, 17.0], + [1.0, 3.0, 10.0, 9.0], + [1.0, 1.0, 1.0, 1.0] + ]) + assert bunch.regressor_names == None + assert bunch.regressors == None + + @staticmethod + @mark.unit_test + def test_parameters_file(remove_test_dir): + """ Test the get_parameters_file method """ + + confounds_file_path = join( + Configuration()['directories']['test_data'], 'pipelines', 'confounds.tsv') + + PipelineTeamT54A.get_parameters_file( + confounds_file_path, + 'fake_subject_id', + 'fake_run_id', + TEMPORARY_DIR + ) + + # Check parameter file was created + assert exists(join( + TEMPORARY_DIR, + 'parameters_file', + 'parameters_file_sub-fake_subject_id_run-fake_run_id.tsv') + ) + + @staticmethod + @mark.unit_test + def test_one_sample_t_test_regressors(): + """ Test the get_one_sample_t_test_regressors method """ + + regressors = PipelineTeamT54A.get_one_sample_t_test_regressors(['001', '002']) + assert regressors == {'group_mean': [1, 1]} + + @staticmethod + @mark.unit_test + def test_two_sample_t_test_regressors(): + """ Test the get_two_sample_t_test_regressors method """ + + regressors, groups = PipelineTeamT54A.get_two_sample_t_test_regressors( + ['001', '003'], # equalRange group + ['002', '004'], # equalIndifference group + ['001', '002', '003', '004'] # all subjects + ) + assert regressors == dict( + equalRange = [1, 0, 1, 0], + equalIndifference = [0, 1, 0, 1] + ) + assert groups == [1, 2, 1, 2] + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeamT54A and compare results """ + helpers.test_pipeline_evaluation('T54A') diff --git a/tests/test_conftest.py b/tests/test_conftest.py index de7d72cb..658f5d89 100644 --- a/tests/test_conftest.py +++ b/tests/test_conftest.py @@ -17,7 +17,7 @@ from datetime import datetime -from pytest import mark, helpers, fixture +from pytest import mark, helpers, fixture, raises from nipype import Node, Workflow from nipype.interfaces.utility import Function @@ -239,6 +239,49 @@ def download(self): class TestConftest: """ A class that contains all the unit tests for the conftest module.""" + @staticmethod + @mark.unit_test + def test_test_outputs(set_test_directory): + """ Test the test_pipeline_outputs helper """ + + # Test pipeline + pipeline = MockupPipeline() + pipeline.subject_list = ['001', '002'] + + # Wrong length for nb_of_outputs + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [1,2,3]) + + # Wrong number of outputs + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [0, 2, 2, 20, 18]) + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [2, 0, 2, 20, 18]) + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [2, 2, 0, 20, 18]) + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [2, 2, 2, 0, 18]) + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [2, 2, 2, 20, 0]) + + # Right number of outputs + helpers.test_pipeline_outputs(pipeline, [2, 2, 2, 20, 18]) + + # Not a valid path name + pipeline.get_group_level_outputs = lambda : 'not_fo\rmatted' + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [2, 2, 2, 1, 18]) + + # Not a valid path name + pipeline.get_group_level_outputs = lambda : '{not_formatted' + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [2, 2, 2, 1, 18]) + + # Not a valid path name + pipeline.get_group_level_outputs = lambda : '{not_formatted' + with raises(AssertionError): + helpers.test_pipeline_outputs(pipeline, [2, 2, 2, 1, 18]) + @staticmethod @mark.unit_test def test_test_correlation_results(mocker): diff --git a/tests/test_data/pipelines/events_resp.tsv b/tests/test_data/pipelines/events_resp.tsv new file mode 100644 index 00000000..dd5ea1a5 --- /dev/null +++ b/tests/test_data/pipelines/events_resp.tsv @@ -0,0 +1,5 @@ +onset duration gain loss RT participant_response +4.071 4 14 6 2.388 weakly_accept +11.834 4 34 14 2.289 strongly_accept +27.535 4 10 15 2.08 strongly_reject +36.435 4 16 17 2.288 weakly_reject \ No newline at end of file diff --git a/tests/test_data/pipelines/participants.tsv b/tests/test_data/pipelines/participants.tsv new file mode 100644 index 00000000..312dbcde --- /dev/null +++ b/tests/test_data/pipelines/participants.tsv @@ -0,0 +1,5 @@ +participant_id group gender age +sub-001 equalIndifference M 24 +sub-002 equalRange M 25 +sub-003 equalIndifference F 27 +sub-004 equalRange M 25 \ No newline at end of file