Skip to content

Commit

Permalink
Remove TaskWithPriorityMixin and only apply priority to SubmitExperim…
Browse files Browse the repository at this point in the history
…entToGemma
  • Loading branch information
arteymix committed Oct 12, 2023
1 parent c6174e2 commit b8699b2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
28 changes: 15 additions & 13 deletions rnaseq_pipeline/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .sources.local import DownloadLocalSample, DownloadLocalExperiment
from .sources.sra import DownloadSraProject, DownloadSraExperiment, ExtractSraProjectBatchInfo
from .targets import GemmaDatasetPlatform, GemmaDatasetHasBatch, RsemReference
from .utils import no_retry, IlluminaFastqHeader, TaskWithPriorityMixin, RerunnableTaskMixin, remove_task_output
from .utils import no_retry, IlluminaFastqHeader, RerunnableTaskMixin, remove_task_output
from .gemma import GemmaTask

logger = logging.getLogger('luigi-interface')
Expand Down Expand Up @@ -53,7 +53,7 @@ def requires(self):
else:
raise ValueError('Unknown source for sample: {}.'.format(self.source))

class DownloadExperiment(TaskWithPriorityMixin, TaskWithOutputMixin, WrapperTask):
class DownloadExperiment(TaskWithOutputMixin, WrapperTask):
"""
This is a generic task that detects which kind of experiment is intended to
be downloaded so that downstream tasks can process regardless of the data
Expand Down Expand Up @@ -134,7 +134,7 @@ def run(self):
else:
raise NotImplementedError('Trimming more than two mates is not supported.')

class TrimExperiment(TaskWithPriorityMixin, DynamicTaskWithOutputMixin, DynamicWrapperTask):
class TrimExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask):
"""
Quality control all the samples in a given experiment.
"""
Expand All @@ -160,7 +160,7 @@ def run(self):
os.makedirs(destdir, exist_ok=True)
yield [fastqc.GenerateReport(fastq_in.path, destdir) for fastq_in in self.input()]

class QualityControlExperiment(TaskWithPriorityMixin, DynamicTaskWithOutputMixin, DynamicWrapperTask):
class QualityControlExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask):
"""
Quality control all the samples in a given experiment.
"""
Expand Down Expand Up @@ -289,7 +289,7 @@ def program_args(self):
def output(self):
return luigi.LocalTarget(self._get_output_prefix() + f'.{self.scope}.results')

class AlignExperiment(TaskWithPriorityMixin, DynamicTaskWithOutputMixin, DynamicWrapperTask):
class AlignExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask):
"""
Align all the samples in a given experiment.
Expand Down Expand Up @@ -317,7 +317,7 @@ def run(self):

@no_retry
@requires(TrimExperiment, QualityControlExperiment, AlignExperiment)
class GenerateReportForExperiment(TaskWithPriorityMixin, RerunnableTaskMixin, luigi.Task):
class GenerateReportForExperiment(RerunnableTaskMixin, luigi.Task):
"""
Generate a summary report for an experiment with MultiQC.
Expand All @@ -336,7 +336,7 @@ def output(self):
return luigi.LocalTarget(join(cfg.OUTPUT_DIR, 'report', self.reference_id, self.experiment_id, 'multiqc_report.html'))

@requires(AlignExperiment)
class CountExperiment(TaskWithPriorityMixin, luigi.Task):
class CountExperiment(luigi.Task):
"""
Combine the RSEM quantifications results from all the samples in a given
experiment.
Expand All @@ -362,7 +362,7 @@ def output(self):
return [luigi.LocalTarget(join(destdir, f'{self.experiment_id}_counts.{self.scope}')),
luigi.LocalTarget(join(destdir, f'{self.experiment_id}_fpkm.{self.scope}'))]

class SubmitExperimentBatchInfoToGemma(TaskWithPriorityMixin, GemmaTask):
class SubmitExperimentBatchInfoToGemma(GemmaTask):
"""
Submit the batch information of an experiment to Gemma.
"""
Expand All @@ -388,7 +388,7 @@ def output(self):
return GemmaDatasetHasBatch(self.experiment_id)

@no_retry
class SubmitExperimentDataToGemma(TaskWithPriorityMixin, RerunnableTaskMixin, GemmaTask):
class SubmitExperimentDataToGemma(RerunnableTaskMixin, GemmaTask):
"""
Submit an experiment to Gemma.
Expand Down Expand Up @@ -419,7 +419,7 @@ def subcommand_args(self):
def output(self):
return GemmaDatasetPlatform(self.experiment_id, self.platform_short_name)

class SubmitExperimentReportToGemma(TaskWithPriorityMixin, WrapperTask, GemmaTask):
class SubmitExperimentReportToGemma(WrapperTask, GemmaTask):
"""
Submit an experiment QC report to Gemma.
Expand All @@ -434,7 +434,7 @@ def run(self):
pass # do nothing

@requires(SubmitExperimentDataToGemma, SubmitExperimentBatchInfoToGemma, SubmitExperimentReportToGemma)
class SubmitExperimentToGemma(TaskWithPriorityMixin, TaskWithOutputMixin, WrapperTask):
class SubmitExperimentToGemma(TaskWithOutputMixin, WrapperTask):
"""
Submit an experiment data, QC reports, and batch information to Gemma.
Expand All @@ -448,6 +448,8 @@ class SubmitExperimentToGemma(TaskWithPriorityMixin, TaskWithOutputMixin, Wrappe
retry_count = 1
retry_delay = 1200

priority = luigi.IntParameter(default=100, positional=False, significant=False)

def _targets_to_remove(self):
outs = []
# original data
Expand Down Expand Up @@ -482,12 +484,12 @@ def complete(self):
"""
return super().complete() and all(not out.exists() for out in self._targets_to_remove())

class SubmitExperimentsFromDataFrameMixin(TaskWithPriorityMixin):
class SubmitExperimentsFromDataFrameMixin:
ignore_priority = luigi.BoolParameter(positional=False, significant=False, description='Ignore the priority column and inherit the priority of the this task. Rows with zero priority are nonetheless ignored.')
def requires(self):
df = self._retrieve_dataframe()
# using None, the worker will inherit the priority from this task for all its dependencies
return [SubmitExperimentToGemma(row.experiment_id, priority=None if self.ignore_priority else row.get('priority'), rerun=row.get('data')=='resubmit')
return [SubmitExperimentToGemma(row.experiment_id, priority=100 if self.ignore_priority else row.get('priority'), rerun=row.get('data')=='resubmit')
for _, row in df.iterrows() if row.get('priority', 1) > 0]

class SubmitExperimentsFromFileToGemma(SubmitExperimentsFromDataFrameMixin, TaskWithOutputMixin, WrapperTask):
Expand Down
4 changes: 0 additions & 4 deletions rnaseq_pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ def wrapper(cls):

no_retry = max_retry(0)

class TaskWithPriorityMixin:
"""Mixin that adds a --priority flag to a given task."""
priority = luigi.IntParameter(default=0, positional=False, significant=False)

class RerunnableTaskMixin:
"""
Mixin for a task that can be rerun regardless of its completion status.
Expand Down

0 comments on commit b8699b2

Please sign in to comment.