diff --git a/rnaseq_pipeline/tasks.py b/rnaseq_pipeline/tasks.py index b6ab601a..27ec27de 100755 --- a/rnaseq_pipeline/tasks.py +++ b/rnaseq_pipeline/tasks.py @@ -22,7 +22,7 @@ from .sources.local import DownloadLocalSample, DownloadLocalExperiment from .sources.sra import DownloadSraProject, DownloadSraExperiment, ExtractSraProjectBatchInfo from .targets import GemmaDatasetPlatform, GemmaDatasetHasBatch, RsemReference -from .utils import no_retry, IlluminaFastqHeader, TaskWithPriorityMixin, RerunnableTaskMixin, remove_task_output +from .utils import no_retry, IlluminaFastqHeader, RerunnableTaskMixin, remove_task_output from .gemma import GemmaTask logger = logging.getLogger('luigi-interface') @@ -53,7 +53,7 @@ def requires(self): else: raise ValueError('Unknown source for sample: {}.'.format(self.source)) -class DownloadExperiment(TaskWithPriorityMixin, TaskWithOutputMixin, WrapperTask): +class DownloadExperiment(TaskWithOutputMixin, WrapperTask): """ This is a generic task that detects which kind of experiment is intended to be downloaded so that downstream tasks can process regardless of the data @@ -134,7 +134,7 @@ def run(self): else: raise NotImplementedError('Trimming more than two mates is not supported.') -class TrimExperiment(TaskWithPriorityMixin, DynamicTaskWithOutputMixin, DynamicWrapperTask): +class TrimExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask): """ Quality control all the samples in a given experiment. """ @@ -160,7 +160,7 @@ def run(self): os.makedirs(destdir, exist_ok=True) yield [fastqc.GenerateReport(fastq_in.path, destdir) for fastq_in in self.input()] -class QualityControlExperiment(TaskWithPriorityMixin, DynamicTaskWithOutputMixin, DynamicWrapperTask): +class QualityControlExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask): """ Quality control all the samples in a given experiment. """ @@ -289,7 +289,7 @@ def program_args(self): def output(self): return luigi.LocalTarget(self._get_output_prefix() + f'.{self.scope}.results') -class AlignExperiment(TaskWithPriorityMixin, DynamicTaskWithOutputMixin, DynamicWrapperTask): +class AlignExperiment(DynamicTaskWithOutputMixin, DynamicWrapperTask): """ Align all the samples in a given experiment. @@ -317,7 +317,7 @@ def run(self): @no_retry @requires(TrimExperiment, QualityControlExperiment, AlignExperiment) -class GenerateReportForExperiment(TaskWithPriorityMixin, RerunnableTaskMixin, luigi.Task): +class GenerateReportForExperiment(RerunnableTaskMixin, luigi.Task): """ Generate a summary report for an experiment with MultiQC. @@ -336,7 +336,7 @@ def output(self): return luigi.LocalTarget(join(cfg.OUTPUT_DIR, 'report', self.reference_id, self.experiment_id, 'multiqc_report.html')) @requires(AlignExperiment) -class CountExperiment(TaskWithPriorityMixin, luigi.Task): +class CountExperiment(luigi.Task): """ Combine the RSEM quantifications results from all the samples in a given experiment. @@ -362,7 +362,7 @@ def output(self): return [luigi.LocalTarget(join(destdir, f'{self.experiment_id}_counts.{self.scope}')), luigi.LocalTarget(join(destdir, f'{self.experiment_id}_fpkm.{self.scope}'))] -class SubmitExperimentBatchInfoToGemma(TaskWithPriorityMixin, GemmaTask): +class SubmitExperimentBatchInfoToGemma(GemmaTask): """ Submit the batch information of an experiment to Gemma. """ @@ -388,7 +388,7 @@ def output(self): return GemmaDatasetHasBatch(self.experiment_id) @no_retry -class SubmitExperimentDataToGemma(TaskWithPriorityMixin, RerunnableTaskMixin, GemmaTask): +class SubmitExperimentDataToGemma(RerunnableTaskMixin, GemmaTask): """ Submit an experiment to Gemma. @@ -419,7 +419,7 @@ def subcommand_args(self): def output(self): return GemmaDatasetPlatform(self.experiment_id, self.platform_short_name) -class SubmitExperimentReportToGemma(TaskWithPriorityMixin, WrapperTask, GemmaTask): +class SubmitExperimentReportToGemma(WrapperTask, GemmaTask): """ Submit an experiment QC report to Gemma. @@ -434,7 +434,7 @@ def run(self): pass # do nothing @requires(SubmitExperimentDataToGemma, SubmitExperimentBatchInfoToGemma, SubmitExperimentReportToGemma) -class SubmitExperimentToGemma(TaskWithPriorityMixin, TaskWithOutputMixin, WrapperTask): +class SubmitExperimentToGemma(TaskWithOutputMixin, WrapperTask): """ Submit an experiment data, QC reports, and batch information to Gemma. @@ -448,6 +448,8 @@ class SubmitExperimentToGemma(TaskWithPriorityMixin, TaskWithOutputMixin, Wrappe retry_count = 1 retry_delay = 1200 + priority = luigi.IntParameter(default=100, positional=False, significant=False) + def _targets_to_remove(self): outs = [] # original data @@ -482,12 +484,12 @@ def complete(self): """ return super().complete() and all(not out.exists() for out in self._targets_to_remove()) -class SubmitExperimentsFromDataFrameMixin(TaskWithPriorityMixin): +class SubmitExperimentsFromDataFrameMixin: ignore_priority = luigi.BoolParameter(positional=False, significant=False, description='Ignore the priority column and inherit the priority of the this task. Rows with zero priority are nonetheless ignored.') def requires(self): df = self._retrieve_dataframe() # using None, the worker will inherit the priority from this task for all its dependencies - return [SubmitExperimentToGemma(row.experiment_id, priority=None if self.ignore_priority else row.get('priority'), rerun=row.get('data')=='resubmit') + return [SubmitExperimentToGemma(row.experiment_id, priority=100 if self.ignore_priority else row.get('priority'), rerun=row.get('data')=='resubmit') for _, row in df.iterrows() if row.get('priority', 1) > 0] class SubmitExperimentsFromFileToGemma(SubmitExperimentsFromDataFrameMixin, TaskWithOutputMixin, WrapperTask): diff --git a/rnaseq_pipeline/utils.py b/rnaseq_pipeline/utils.py index b0c09a60..4eb5c3fb 100644 --- a/rnaseq_pipeline/utils.py +++ b/rnaseq_pipeline/utils.py @@ -49,10 +49,6 @@ def wrapper(cls): no_retry = max_retry(0) -class TaskWithPriorityMixin: - """Mixin that adds a --priority flag to a given task.""" - priority = luigi.IntParameter(default=0, positional=False, significant=False) - class RerunnableTaskMixin: """ Mixin for a task that can be rerun regardless of its completion status.