Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subworkflow language server fixes #593

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### `Changed`

- [#591](https://github.com/genomic-medicine-sweden/nallo/pull/591) - Updated version to 0.6.0dev
- [#593](https://github.com/genomic-medicine-sweden/nallo/pull/593) - Updated local subworkflow to fix Nextflow language server issues

### `Removed`

Expand Down
251 changes: 119 additions & 132 deletions subworkflows/local/utils_nfcore_nallo_pipeline/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -17,109 +17,6 @@ include { imNotification } from '../../nf-core/utils_nfcore_pipeline'
include { UTILS_NFCORE_PIPELINE } from '../../nf-core/utils_nfcore_pipeline'
include { UTILS_NEXTFLOW_PIPELINE } from '../../nf-core/utils_nextflow_pipeline'

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
DEFINE DEPENDENCIES (FILES AND WORKFLOWS) FOR OTHER WORKFLOWS
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/

//
// nf-validation does not support contitional file and params validation,
// add these here.
//

//
// Define subworkflows and their associated "--skip"
//
def workflowSkips = [
assembly : "skip_genome_assembly",
mapping : "skip_alignment",
snv_calling : "skip_snv_calling",
snv_annotation : "skip_snv_annotation",
sv_calling : "skip_sv_calling",
sv_annotation : "skip_sv_annotation",
call_paralogs : "skip_call_paralogs",
cnv_calling : "skip_cnv_calling",
phasing : "skip_phasing",
rank_variants : "skip_rank_variants",
repeat_calling : "skip_repeat_calling",
repeat_annotation: "skip_repeat_annotation",
methylation : "skip_methylation_pileups",
qc : "skip_qc",
]

//
// E.g., the CNV-calling workflow depends on mapping and snv_calling and can't run without them.
//
def workflowDependencies = [
call_paralogs : ["mapping"],
snv_calling : ["mapping"],
qc : ["mapping"],
sv_calling : ["mapping"],
sv_annotation : ["mapping", "cnv_calling", "sv_calling"],
snv_annotation : ["mapping", "snv_calling"],
cnv_calling : ["mapping", "snv_calling"],
phasing : ["mapping", "snv_calling"],
rank_variants : ["mapping", "snv_calling", "snv_annotation", "sv_annotation"],
repeat_calling : ["mapping", "snv_calling", "phasing"],
repeat_annotation: ["mapping", "snv_calling", "phasing", "repeat_calling"],
methylation : ["mapping", "snv_calling"]
]

//
// E.g., the par_regions file is required by the assembly workflow and the assembly workflow can't run without par_regions
//
def fileDependencies = [
mapping : ["fasta", "somalier_sites"],
assembly : ["fasta"], // The assembly workflow should perhaps be split into two - assembly and alignment (requires ref)
snv_calling : ["fasta", "par_regions"],
snv_annotation : ["echtvar_snv_databases", "vep_cache", "vep_plugin_files", "variant_consequences_snvs"],
sv_calling : ["fasta"],
sv_annotation : ["svdb_sv_databases", "vep_cache", "vep_plugin_files", "variant_consequences_svs"],
cnv_calling : ["hificnv_expected_xy_cn", "hificnv_expected_xx_cn", "hificnv_excluded_regions"],
rank_variants : ["genmod_reduced_penetrance", "genmod_score_config_snvs", "genmod_score_config_svs"],
repeat_calling : ["trgt_repeats"],
repeat_annotation: ["stranger_repeat_catalog"],
]

def parameterStatus = [
workflow: [
skip_snv_calling : params.skip_snv_calling,
skip_phasing : params.skip_phasing,
skip_methylation_pileups : params.skip_methylation_pileups,
skip_rank_variants : params.skip_rank_variants,
skip_repeat_calling : params.skip_repeat_calling,
skip_repeat_annotation : params.skip_repeat_annotation,
skip_snv_annotation : params.skip_snv_annotation,
skip_sv_calling : params.skip_sv_calling,
skip_sv_annotation : params.skip_sv_annotation,
skip_call_paralogs : params.skip_call_paralogs,
skip_cnv_calling : params.skip_cnv_calling,
skip_alignment : params.skip_alignment,
skip_qc : params.skip_qc,
skip_genome_assembly : params.skip_genome_assembly,
],
files: [
par_regions : params.par_regions,
echtvar_snv_databases : params.echtvar_snv_databases,
svdb_sv_databases : params.svdb_sv_databases,
somalier_sites : params.somalier_sites,
vep_cache : params.vep_cache,
hificnv_expected_xy_cn : params.hificnv_expected_xy_cn,
hificnv_expected_xx_cn : params.hificnv_expected_xx_cn,
hificnv_excluded_regions : params.hificnv_excluded_regions,
fasta : params.fasta,
trgt_repeats : params.trgt_repeats,
stranger_repeat_catalog : params.stranger_repeat_catalog,
genmod_reduced_penetrance: params.genmod_reduced_penetrance,
genmod_score_config_snvs : params.genmod_score_config_snvs,
genmod_score_config_svs : params.genmod_score_config_svs,
variant_consequences_snvs: params.variant_consequences_snvs,
variant_consequences_svs : params.variant_consequences_svs,
vep_plugin_files : params.vep_plugin_files,
]
]

/*
SUBWORKFLOW TO INITIALISE PIPELINE
*/
Expand Down Expand Up @@ -164,6 +61,103 @@ workflow PIPELINE_INITIALISATION {
nextflow_cli_args
)

//
// nf-validation does not support contitional file and params validation,
// add these here.
//

//
// Define subworkflows and their associated "--skip"
//
def workflowSkips = [
assembly : "skip_genome_assembly",
mapping : "skip_alignment",
snv_calling : "skip_snv_calling",
snv_annotation : "skip_snv_annotation",
sv_calling : "skip_sv_calling",
sv_annotation : "skip_sv_annotation",
call_paralogs : "skip_call_paralogs",
cnv_calling : "skip_cnv_calling",
phasing : "skip_phasing",
rank_variants : "skip_rank_variants",
repeat_calling : "skip_repeat_calling",
repeat_annotation: "skip_repeat_annotation",
methylation : "skip_methylation_pileups",
qc : "skip_qc",
]

//
// E.g., the CNV-calling workflow depends on mapping and snv_calling and can't run without them.
//
def workflowDependencies = [
call_paralogs : ["mapping"],
snv_calling : ["mapping"],
qc : ["mapping"],
sv_calling : ["mapping"],
sv_annotation : ["mapping", "cnv_calling", "sv_calling"],
snv_annotation : ["mapping", "snv_calling"],
cnv_calling : ["mapping", "snv_calling"],
phasing : ["mapping", "snv_calling"],
rank_variants : ["mapping", "snv_calling", "snv_annotation", "sv_annotation"],
repeat_calling : ["mapping", "snv_calling", "phasing"],
repeat_annotation: ["mapping", "snv_calling", "phasing", "repeat_calling"],
methylation : ["mapping", "snv_calling"]
]

//
// E.g., the par_regions file is required by the assembly workflow and the assembly workflow can't run without par_regions
//
def fileDependencies = [
mapping : ["fasta", "somalier_sites"],
assembly : ["fasta"], // The assembly workflow should perhaps be split into two - assembly and alignment (requires ref)
snv_calling : ["fasta", "par_regions"],
snv_annotation : ["echtvar_snv_databases", "vep_cache", "vep_plugin_files", "variant_consequences_snvs"],
sv_calling : ["fasta"],
sv_annotation : ["svdb_sv_databases", "vep_cache", "vep_plugin_files", "variant_consequences_svs"],
cnv_calling : ["hificnv_expected_xy_cn", "hificnv_expected_xx_cn", "hificnv_excluded_regions"],
rank_variants : ["genmod_reduced_penetrance", "genmod_score_config_snvs", "genmod_score_config_svs"],
repeat_calling : ["trgt_repeats"],
repeat_annotation: ["stranger_repeat_catalog"],
]

def parameterStatus = [
workflow: [
skip_snv_calling : params.skip_snv_calling,
skip_phasing : params.skip_phasing,
skip_methylation_pileups : params.skip_methylation_pileups,
skip_rank_variants : params.skip_rank_variants,
skip_repeat_calling : params.skip_repeat_calling,
skip_repeat_annotation : params.skip_repeat_annotation,
skip_snv_annotation : params.skip_snv_annotation,
skip_sv_calling : params.skip_sv_calling,
skip_sv_annotation : params.skip_sv_annotation,
skip_call_paralogs : params.skip_call_paralogs,
skip_cnv_calling : params.skip_cnv_calling,
skip_alignment : params.skip_alignment,
skip_qc : params.skip_qc,
skip_genome_assembly : params.skip_genome_assembly,
],
files: [
par_regions : params.par_regions,
echtvar_snv_databases : params.echtvar_snv_databases,
svdb_sv_databases : params.svdb_sv_databases,
somalier_sites : params.somalier_sites,
vep_cache : params.vep_cache,
hificnv_expected_xy_cn : params.hificnv_expected_xy_cn,
hificnv_expected_xx_cn : params.hificnv_expected_xx_cn,
hificnv_excluded_regions : params.hificnv_excluded_regions,
fasta : params.fasta,
trgt_repeats : params.trgt_repeats,
stranger_repeat_catalog : params.stranger_repeat_catalog,
genmod_reduced_penetrance: params.genmod_reduced_penetrance,
genmod_score_config_snvs : params.genmod_score_config_snvs,
genmod_score_config_svs : params.genmod_score_config_svs,
variant_consequences_snvs: params.variant_consequences_snvs,
variant_consequences_svs : params.variant_consequences_svs,
vep_plugin_files : params.vep_plugin_files,
]
]

//
// Custom validation for pipeline parameters
//
Expand All @@ -187,9 +181,8 @@ workflow PIPELINE_INITIALISATION {
[ sample, metas[0] + [n_files: metas.size() + metas.size() * Math.max(0, params.alignment_processes - 1), single_end:true ], reads ]
}
// Convert back to [ meta, reads ]
.flatMap {
sample, meta, reads ->
reads.collect { return [ meta, it ] }
.flatMap { _sample, meta, reads ->
reads.collect { return [ meta, it ] }
}
.set { ch_samplesheet }

Expand Down Expand Up @@ -271,7 +264,7 @@ def validateInputParameters(statusMap, workflowMap, workflowDependencies, fileDe
//
def validateInputSamplesheet(input) {
// Filenames needs to be unique for each sample to avoid collisions when merging
fileNames = input[2].collect { new File(it.toString()).name }
def fileNames = input[2].collect { new File(it.toString()).name }
if (fileNames.size() != fileNames.unique().size()) {
error "Error: Input filenames needs to be unique for each sample."
}
Expand Down Expand Up @@ -336,7 +329,7 @@ def citationBibliographyText(ch_versions, references_yaml, description) {

def unwantedReferences = ['genomic-medicine-sweden/nallo', 'Nextflow']
// These are not collected in ch_versions but should be referenced
baseTools = Channel.from(['nextflow', 'nf_core', 'bioconda', 'biocontainers', 'multiqc'])
def baseTools = Channel.from(['nextflow', 'nf_core', 'bioconda', 'biocontainers', 'multiqc'])

ch_versions
.map { module_yaml -> extractSoftwareFromVersions(module_yaml) }
Expand All @@ -361,19 +354,13 @@ def citationBibliographyText(ch_versions, references_yaml, description) {
def validateParameterCombinations(statusMap, workflowMap, workflowDependencies, fileDependencies) {
// Array to store errors
def errors = []
// For each of the "workflow", "files"
statusMap.each { paramsType, allParams ->
// Go through all params and their status
statusMap[paramsType].each { param, paramStatus ->
switch (paramsType) {
case "files":
checkFileDependencies(param, fileDependencies, statusMap, workflowMap, errors)
break
case "workflow":
checkWorkflowDependencies(param, workflowDependencies, statusMap, workflowMap, errors)
break
default:
break
// For each of the "workflow", "files" - TODO: All errors now are comming from other parts of the workflow
statusMap.each { paramsType, paramsMap ->
paramsMap.each { param, _paramStatus ->
if (paramsType == "files") {
checkFileDependencies(param, fileDependencies, statusMap, workflowMap, errors)
} else if (paramsType == "workflow") {
checkWorkflowDependencies(param, workflowDependencies, statusMap, workflowMap, errors)
}
}
}
Expand All @@ -393,10 +380,10 @@ def validateParameterCombinations(statusMap, workflowMap, workflowDependencies,
def checkWorkflowDependencies(String skip, Map combinationsMap, Map statusMap, Map workflowMap, List errors) {

// Lookup the workflow associated with the --skip_xxx parameter
currentWorkflow = workflowMap.find { key, mapValue -> mapValue == skip }?.key
def currentWorkflow = workflowMap.find { _key, mapValue -> mapValue == skip }?.key

// If the --skip is not set, then the workflow is active, give no error
workflowIsActive = !statusMap["workflow"][skip]
def workflowIsActive = !statusMap["workflow"][skip]
if(workflowIsActive) {
return
}
Expand Down Expand Up @@ -426,7 +413,7 @@ def checkFileDependencies(String file, Map combinationsMap, Map statusMap, Map w
// Get all workflows required by a file
def workflowThatRequiresFile = findKeysForValue(file, combinationsMap)

for (workflow in workflowThatRequiresFile) {
workflowThatRequiresFile.each { workflow ->
// Get the "--skip" for that workflow
def workflowSkip = workflowMap[workflow]
// Get the status of the "--skip", if false then workflow is active
Expand All @@ -449,11 +436,11 @@ def findRequiredSkips(paramType, Set<String> requiredWorkflows, Map statusMap, M

def requiredSkips = []

for (currentWorkflow in requiredWorkflows) {
requiredWorkflows.each { currentWorkflow ->
// Get the skip associated with the workflow
skip = workflowMap[currentWorkflow]
def skip = workflowMap[currentWorkflow]

workflowIsSkipped = !statusMap[paramType][skip]
def workflowIsSkipped = !statusMap[paramType][skip]

if(paramType == "workflow") {
if(workflowIsSkipped) {
Expand All @@ -468,7 +455,7 @@ def findKeysForValue(def valueToFind, Map map) {

def keys = []

for (entry in map) {
map.each { entry ->
def key = entry.key
def value = entry.value

Expand Down Expand Up @@ -505,14 +492,14 @@ def validateAllFamiliesHasAffectedSamples(ch_samplesheet, params) {
}

def familiesWithPhenotypes = ch_samplesheet
.map { meta, reads -> [ meta.family_id, meta.phenotype ] }
.map { meta, _reads -> [ meta.family_id, meta.phenotype ] }
.groupTuple()

def familiesWithoutAffected = familiesWithPhenotypes
.filter { family, phenotype -> !phenotype.contains(2) }
.filter { _family, phenotype -> !phenotype.contains(2) }

familiesWithoutAffected
.map { family, phenotype -> family }
.map { family, _phenotype -> family }
.collect()
.subscribe { familyList ->
if (familyList) {
Expand All @@ -523,7 +510,7 @@ def validateAllFamiliesHasAffectedSamples(ch_samplesheet, params) {

def validateSingleProjectPerRun(ch_samplesheet) {
ch_samplesheet
.map { meta, reads -> meta.project }
.map { meta, _reads -> meta.project }
.unique()
.count()
.map { n ->
Expand Down