Skip to content

Commit

Permalink
[Pipeline Runner] Allow flexible level choice (#235)
Browse files Browse the repository at this point in the history
* Adding a command line tool showing the correlation results of a pipeline execution

* [DOC] install doc about correlation command line tool [skip ci]

* Modifications on runner

* Correlation main + exclusions in runner

* Allowing flexible level running [skip ci]

* Redefining levels in naprs_open_runner command line [skip ci]

* [TEST] test update [skip ci]

* [DOC] doc update

* Codespell
  • Loading branch information
bclenet authored Jan 9, 2025
1 parent a09b926 commit faa8de6
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 120 deletions.
1 change: 1 addition & 0 deletions INSTALL.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ narps_open_status --json
> For further information about these command line tools, read the corresponding documentation pages.
> * `narps_open_runner` : [docs/running.md](docs/running.md)
> * `narps_open_tester` : [docs/testing.md](docs/testing.md#command-line-tool)
> * `narps_open_correlations` : [docs/testing.md](docs/testing.md#command-line-tool)
> * `narps_description` : [docs/description.md](docs/description.md)
> * `narps_results` : [docs/data.md](docs/data.md#results-from-narps-teams)
> * `narps_open_status` : [docs/status.md](docs/status.md)
48 changes: 30 additions & 18 deletions docs/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,32 @@ The `narps_open.runner` module allows to run pipelines from the command line.
```bash
narps_open_runner -h
usage: runner.py [-h] -t TEAM (-r RANDOM | -s SUBJECTS [SUBJECTS ...]) [-g | -f]
usage: narps_open_runner [-h] -t
{08MQ,2T6S,3TR7,4SZ2,4TQ6,51PW,98BT,B23O,C88N,J7F9,L7J7,O21U,O6R6,Q6O0,R9K3,T54A,U26C,UK24,X19V}
(-s SUBJECTS [SUBJECTS ...] | -n NSUBJECTS | -r RSUBJECTS) [-l {p,r,s,g} [{p,r,s,g} ...]]
[-c] [-e]

Run the pipelines from NARPS.

options:
-h, --help show this help message and exit
-t TEAM, --team TEAM the team ID
-r RANDOM, --random RANDOM the number of subjects to be randomly selected
-s SUBJECTS [SUBJECTS ...], --subjects SUBJECTS [SUBJECTS ...] a list of subjects
-g, --group run the group level only
-f, --first run the first levels only (preprocessing + subjects + runs)
-t {08MQ,2T6S,3TR7,4SZ2,4TQ6,51PW,98BT,B23O,C88N,J7F9,L7J7,O21U,O6R6,Q6O0,R9K3,T54A,U26C,UK24,X19V}, --team {08MQ,2T6S,3TR7,4SZ2,4TQ6,51PW,98BT,B23O,C88N,J7F9,L7J7,O21U,O6R6,Q6O0,R9K3,T54A,U26C,UK24,X19V}
the team ID
-s SUBJECTS [SUBJECTS ...], --subjects SUBJECTS [SUBJECTS ...]
a list of subjects to be selected
-n NSUBJECTS, --nsubjects NSUBJECTS
the number of subjects to be selected
-r RSUBJECTS, --rsubjects RSUBJECTS
the number of subjects to be selected randomly
-l {p,r,s,g} [{p,r,s,g} ...], --levels {p,r,s,g} [{p,r,s,g} ...]
the analysis levels to run (p=preprocessing, r=run, s=subject, g=group)
-c, --check check pipeline outputs (runner is not launched)
-e, --exclusions run the analyses without the excluded subjects

narps_open_runner -t 2T6S -s 001 006 020 100
narps_open_runner -t 2T6S -r 4
narps_open_runner -t 2T6S -r 4 -f
narps_open_runner -t 2T6S -r 4 -f -c # Check the output files without launching the runner
narps_open_runner -t 2T6S -s 001 006 020 100 # Launches the full pipeline on the given subjects
narps_open_runner -t 2T6S -r 4 # Launches the full pipeline on 4 random subjects
narps_open_runner -t 2T6S -r 4 -l s # Launches the subject level of the pipeline on 4 random subjects
narps_open_runner -t 2T6S -r 4 -l p r s -c # Check the output files of the prerprocessing, run level and subject level parts of the pipeline, without launching it.
```
> [!NOTE]
Expand All @@ -36,7 +45,7 @@ narps_open_runner -t 2T6S -r 4 -f -c # Check the output files without launching
The class `PipelineRunner` is available from the `narps_open.runner` module. You can use it from inside python code, as follows :
```python
from narps_open.runner import PipelineRunner
from narps_open.runner import PipelineRunner, PipelineRunnerLevel

# Initialize a PipelineRunner by choosing the team ID
runner = PipelineRunner(team_id = '2T6S')
Expand All @@ -53,16 +62,19 @@ runner.subjects = ['001', '006', '020', '100']
# Alternatively, ask the runner to pick a random number of subjects
# runner.random_nb_subjects = 4

# Start the runner
# Start the runner (all available levels)
runner.start()

# Or start the first level only (preprocessing + run level + subject level)
runner.start(True, False)
# Start the subject level only
runner.start(PipelineRunnerLevel.SUBJECT)

# Or start the second level only (group level)
runner.start(True, True)
# Or start the "first level" (preprocessing + run level + subject level)
runner.start(PipelineRunnerLevel.FIRST)

# Or start the group level only
runner.start(PipelineRunnerLevel.GROUP)

# Get the list of missing files (if any) after the pipeline finished
runner.get_missing_first_level_outputs()
runner.get_missing_group_level_outputs()
runner.get_missing_outputs() # for all available levels
runner.get_missing_outputs(PipelineRunnerLevel.PREPROCESSING) # for preprocessing only
```
10 changes: 9 additions & 1 deletion docs/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Tests can be launched manually or while using CI (Continuous Integration).
* To run a tests with a given mark 'mark' : `pytest -m 'mark'`
* To create code coverage data : `coverage run -m pytest ./tests` then `coverage report` to see the code coverage result or `coverage xml` to output a .xml report file

## Command line tool
## Command line tools

We created the simple command line tool `narps_open_tester` to help testing the outcome of one pipeline.

Expand All @@ -58,6 +58,14 @@ This will run the pipeline for the requested team -here 08MQ- on subsets of subj

Once finished, a text file report (`test_pipeline-*.txt`) is created, containing all the computed correlation values.

The command line tool `narps_open_correlations` is also available and can be used as follows:

```bash
narps_open_correlations -t 2T6S -n 60
```

to get the correlation values for the results of a previously executed pipeline (here team 2T6S, with 60 subjects).

## Configuration files for testing

* `pytest.ini` is a global configuration files for using pytest (see reference [here](https://docs.pytest.org/en/7.1.x/reference/customize.html)). It allows to [register markers](https://docs.pytest.org/en/7.1.x/example/markers.html) that help to better identify tests. Note that `pytest.ini` could be replaced by data inside `pyproject.toml` in the next versions.
Expand Down
2 changes: 1 addition & 1 deletion narps_open/pipelines/team_V55J.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def get_preprocessing(self):

# COREGISTER - Coregistration from anat to realigned func mean image
# We kept the default values for all other parameters.
# TODO apply to files ... but reverse tansform ?
# TODO apply to files ... but reverse transform ?
coregistration = Node(Coregister(), name = 'coregistration')
coregistration.inputs.jobtype = 'estimate'
coregistration.inputs.write_mask = False
Expand Down
174 changes: 110 additions & 64 deletions narps_open/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from importlib import import_module
from random import choices
from argparse import ArgumentParser
from enum import Flag, auto

from nipype import Workflow, config

Expand All @@ -19,6 +20,17 @@
from narps_open.utils.configuration import Configuration
from narps_open.pipelines import get_implemented_pipelines

class PipelineRunnerLevel(Flag):
""" A class to enumerate possible levels for a pipeline. """
NONE = 0
PREPROCESSING = auto()
RUN = auto()
SUBJECT = auto()
GROUP = auto()
ALL = PREPROCESSING | RUN | SUBJECT | GROUP
FIRST = PREPROCESSING | RUN | SUBJECT
SECOND = GROUP

class PipelineRunner():
""" A class that allows to run a NARPS pipeline. """

Expand Down Expand Up @@ -87,75 +99,101 @@ def team_id(self, value: str) -> None:
implemented_pipelines[self._team_id])
self._pipeline = class_type()

def start(self, first_level_only: bool = False, group_level_only: bool = False) -> None:
@staticmethod
def get_workflows(input_workflow):
"""
Return a list of nipype.Workflow from the passed argument.
Arguments:
- input_workflow: either a list of nipype.Workflow or a nipype.Workflow
Returns:
- a list of nipype.Workflow:
- [input_workflow] if input_workflow is a nipype.Workflow
- input_workflow if input_workflow is a list of nipype.Workflow
- an empty list if input_workflow is None
"""
if isinstance(input_workflow, Workflow):
return [input_workflow]
if input_workflow is None:
return []
if isinstance(input_workflow, list):
for sub_workflow in input_workflow:
if not isinstance(sub_workflow, Workflow):
raise AttributeError('When using a list of workflows,\
all elements must be of type nipype.Workflow')
return input_workflow
raise AttributeError('Workflow must be of type list or nipype.Workflow')

def start(self, level: PipelineRunnerLevel = PipelineRunnerLevel.ALL) -> None:
"""
Start the pipeline
Arguments:
- first_level_only: bool (False by default), run the first level workflows only,
(= preprocessing + run level + subject_level)
- group_level_only: bool (False by default), run the group level workflows only
- level: PipelineRunnerLevel, indicates which workflow(s) to run
"""
# Set global nipype config for pipeline execution
config.update_config(dict(execution = {'stop_on_first_crash': 'True'}))

# Disclaimer
print('Starting pipeline for team: '+
f'{self.team_id}, with {len(self.subjects)} subjects: {self.subjects}')

if first_level_only and group_level_only:
raise AttributeError('first_level_only and group_level_only cannot both be True')

# Generate workflow lists
first_level_workflows = []
group_level_workflows = []

if not group_level_only:
for workflow in [
self._pipeline.get_preprocessing(),
self._pipeline.get_run_level_analysis(),
self._pipeline.get_subject_level_analysis()]:

if isinstance(workflow, list):
for sub_workflow in workflow:
first_level_workflows.append(sub_workflow)
else:
first_level_workflows.append(workflow)

if not first_level_only:
for workflow in [self._pipeline.get_group_level_analysis()]:
if isinstance(workflow, list):
for sub_workflow in workflow:
group_level_workflows.append(sub_workflow)
else:
group_level_workflows.append(workflow)
print(f'\tThe following levels will be run: {level}')

# Generate workflow list & level list
workflows = []
levels = []
if bool(level & PipelineRunnerLevel.PREPROCESSING):
workflows += self.get_workflows(self._pipeline.get_preprocessing())
levels.append(PipelineRunnerLevel.PREPROCESSING)
if bool(level & PipelineRunnerLevel.RUN):
workflows += self.get_workflows(self._pipeline.get_run_level_analysis())
levels.append(PipelineRunnerLevel.RUN)
if bool(level & PipelineRunnerLevel.SUBJECT):
workflows += self.get_workflows(self._pipeline.get_subject_level_analysis())
levels.append(PipelineRunnerLevel.SUBJECT)
if bool(level & PipelineRunnerLevel.GROUP):
workflows += self.get_workflows(self._pipeline.get_group_level_analysis())
levels.append(PipelineRunnerLevel.GROUP)

# Launch workflows
for workflow in first_level_workflows + group_level_workflows:
if workflow is None:
pass
elif not isinstance(workflow, Workflow):
raise AttributeError('Workflow must be of type nipype.Workflow')
for workflow, current_level in zip(workflows, levels):
nb_procs = Configuration()['runner']['nb_procs']
if nb_procs > 1:
workflow.run('MultiProc', plugin_args = {'n_procs': nb_procs})
else:
nb_procs = Configuration()['runner']['nb_procs']
if nb_procs > 1:
workflow.run('MultiProc', plugin_args = {'n_procs': nb_procs})
else:
workflow.run()
workflow.run()
self.get_missing_outputs(current_level)

def get_missing_first_level_outputs(self):
""" Return the list of missing files after computations of the first level """
files = self._pipeline.get_preprocessing_outputs()
files += self._pipeline.get_run_level_outputs()
files += self._pipeline.get_subject_level_outputs()
def get_missing_outputs(self, level: PipelineRunnerLevel = PipelineRunnerLevel.ALL):
"""
Return the list of missing files after computations of the level(s)
return [f for f in files if not isfile(f)]
Arguments:
- level: PipelineRunnerLevel, indicates for which workflow(s) to search output files
"""
# Generate files list
files = []
if bool(level & PipelineRunnerLevel.PREPROCESSING):
files += self._pipeline.get_preprocessing_outputs()
if bool(level & PipelineRunnerLevel.RUN):
files += self._pipeline.get_run_level_outputs()
if bool(level & PipelineRunnerLevel.SUBJECT):
files += self._pipeline.get_subject_level_outputs()
if bool(level & PipelineRunnerLevel.GROUP):
files += self._pipeline.get_group_level_outputs()

# Get non existing files
missing = [f for f in files if not isfile(f)]

def get_missing_group_level_outputs(self):
""" Return the list of missing files after computations of the group level """
files = self._pipeline.get_group_level_outputs()
# Disclaimer
if missing:
print('There are missing files for team: '+
f'{self.team_id}, with {len(self.subjects)} subjects: {self.subjects}')
print(missing)

return [f for f in files if not isfile(f)]
# Return non existing files
return missing

def main():
""" Entry-point for the command line tool narps_open_runner """
Expand All @@ -171,11 +209,10 @@ def main():
help='the number of subjects to be selected')
subjects.add_argument('-r', '--rsubjects', type=str,
help='the number of subjects to be selected randomly')
levels = parser.add_mutually_exclusive_group(required=False)
levels.add_argument('-g', '--group', action='store_true', default=False,
help='run the group level only')
levels.add_argument('-f', '--first', action='store_true', default=False,
help='run the first levels only (preprocessing + subjects + runs)')
parser.add_argument('-l', '--levels', nargs='+', type=str, action='extend',
choices=['p', 'r', 's', 'g'],
help='the analysis levels to run (p=preprocessing, r=run, s=subject, g=group)'
)
parser.add_argument('-c', '--check', action='store_true', required=False,
help='check pipeline outputs (runner is not launched)')
parser.add_argument('-e', '--exclusions', action='store_true', required=False,
Expand All @@ -194,7 +231,7 @@ def main():
runner.pipeline.directories.set_output_dir_with_team_id(arguments.team)
runner.pipeline.directories.set_working_dir_with_team_id(arguments.team)

# Handle subject
# Handle subjects
if arguments.subjects is not None:
runner.subjects = arguments.subjects
elif arguments.rsubjects is not None:
Expand All @@ -209,18 +246,27 @@ def main():
else:
runner.nb_subjects = int(arguments.nsubjects)

# Build pipeline runner level
if arguments.levels is None:
level = PipelineRunnerLevel.ALL
else:
level = PipelineRunnerLevel.NONE
if 'p' in arguments.levels:
level |= PipelineRunnerLevel.PREPROCESSING
if 'r' in arguments.levels:
level |= PipelineRunnerLevel.RUN
if 's' in arguments.levels:
level |= PipelineRunnerLevel.SUBJECT
if 'g' in arguments.levels:
level |= PipelineRunnerLevel.GROUP

# Check data
if arguments.check:
print('Missing files for team', arguments.team, 'after running',
len(runner.pipeline.subject_list), 'subjects:')
if not arguments.group:
print('First level:', runner.get_missing_first_level_outputs())
if not arguments.first:
print('Group level:', runner.get_missing_group_level_outputs())
runner.get_missing_outputs(level)

# Start the runner
else:
runner.start(arguments.first, arguments.group)
runner.start(level)

if __name__ == '__main__':
main()
Loading

0 comments on commit faa8de6

Please sign in to comment.