Skip to content

Commit

Permalink
Merge pull request #184 from PolinaBevad/add_splicing_mode_stop_on_ex…
Browse files Browse the repository at this point in the history
…ception_NA24631_tests

Fixes for splicing mode, tests and parallel mode
  • Loading branch information
pcingola authored Feb 19, 2019
2 parents df58b9b + 90cab6e commit e2995c9
Show file tree
Hide file tree
Showing 27 changed files with 1,094 additions and 268 deletions.
4 changes: 2 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ String | Description
A variant appears in the output if it satisfies the following criteria (in this order):
1. Frequency of the variant exceeds the threshold set by the `-f` option (default = 1%).
2. The minimum number of high-quality reads supporting variant is larger than the threshold set by the `-r` option (default = 2).
3. The mean position of the variant in reads is less than the value set by the `-P` option (default = 5).
4. The mean base quality (phred score) for the variant is less than the threshold set by the `-q` option (default = 22.5).
3. The mean position of the variant in reads is larger than the value set by the `-P` option (default = 5).
4. The mean base quality (phred score) for the variant is larger than the threshold set by the `-q` option (default = 22.5).
5. Variant frequency is more than 25% or reference allele does not have much better mapping quality than the variant.
6. Deletion variants are not located in the regions where the reference genome is missing.
7. The ratio of high-quality reads to low-quality reads is larger than the threshold specified by `-o` option (default=1.5).
Expand Down
19 changes: 10 additions & 9 deletions src/main/java/com/astrazeneca/vardict/VarDictLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import com.astrazeneca.vardict.data.ReferenceResource;
import com.astrazeneca.vardict.data.Region;
import com.astrazeneca.vardict.data.scopedata.GlobalReadOnlyScope;
import com.astrazeneca.vardict.modes.AbstractMode;
import com.astrazeneca.vardict.modes.AmpliconMode;
import com.astrazeneca.vardict.modes.SimpleMode;
import com.astrazeneca.vardict.modes.SomaticMode;
import com.astrazeneca.vardict.modes.*;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMSequenceRecord;
import htsjdk.samtools.SamReader;
Expand All @@ -30,6 +27,7 @@
import static com.astrazeneca.vardict.data.scopedata.GlobalReadOnlyScope.instance;
import static com.astrazeneca.vardict.collection.Tuple.tuple;
import static com.astrazeneca.vardict.data.Patterns.INTEGER_ONLY;
import static com.astrazeneca.vardict.data.scopedata.GlobalReadOnlyScope.setMode;

/**
* Class starts the Vardict for current run
Expand All @@ -43,23 +41,25 @@ public VarDictLauncher(ReferenceResource referenceResource) {
}

/**
* Initialize resources and starts the needed VarDict mode (amplicon/simple/somatic).
* @param config
* Initialize resources and starts the needed VarDict mode (amplicon/simple/somatic/splicing).
* @param config starting configuration
*/
public void start(Configuration config) {
initResources(config);

final Configuration conf = instance().conf;
final AbstractMode mode;

if (conf.regionOfInterest != null || instance().ampliconBasedCalling == null) {
if (instance().conf.outputSplicing) {
mode = new SplicingMode(segments, referenceResource);
} else if (conf.regionOfInterest != null || instance().ampliconBasedCalling == null) {
mode = conf.bam.hasBam2() ?
new SomaticMode(segments, referenceResource) :
new SimpleMode(segments, referenceResource);
} else {
mode = new AmpliconMode(segments, referenceResource);
}

setMode(mode);
if (instance().conf.threads == 1)
mode.notParallel();
else
Expand Down Expand Up @@ -111,7 +111,8 @@ private void initResources(Configuration conf) {
}
}
}
GlobalReadOnlyScope.init(conf, chrLengths, samples._1, samples._2, ampliconBasedCalling, adaptorForward, adaptorReverse);
GlobalReadOnlyScope.init(conf, chrLengths, samples._1, samples._2, ampliconBasedCalling,
adaptorForward, adaptorReverse);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.astrazeneca.vardict.data.scopedata;

import com.astrazeneca.vardict.Configuration;
import com.astrazeneca.vardict.modes.AbstractMode;
import com.astrazeneca.vardict.printers.PrinterType;

import java.util.Map;

/**
* Global scope of the VarDict. Contains configuration that must be available from all the classes and methods.
* Global scope of the VarDict. Contains configuration that must be available from all the classes and methods
* and current run mode of VarDict for starting pipelines.
* Must be initialized only once. Clear method created only for testing purposes.
*/
public class GlobalReadOnlyScope {
Expand All @@ -18,18 +20,34 @@ public static GlobalReadOnlyScope instance() {
}

public static synchronized void init(Configuration conf, Map<String, Integer> chrLengths, String sample, String samplem,
String ampliconBasedCalling, Map<String, Integer> adaptorForward, Map<String, Integer> adaptorReverse) {
String ampliconBasedCalling, Map<String, Integer> adaptorForward,
Map<String, Integer> adaptorReverse) {
if (instance != null) {
throw new IllegalStateException("GlobalReadOnlyScope was already initialized. Must be initialized only once.");
}
instance = new GlobalReadOnlyScope(conf, chrLengths, sample, samplem, ampliconBasedCalling, adaptorForward, adaptorReverse);
instance = new GlobalReadOnlyScope(conf, chrLengths, sample, samplem, ampliconBasedCalling, adaptorForward,
adaptorReverse);
}

private volatile static AbstractMode mode;

public static AbstractMode getMode() {
return mode;
}

public static synchronized void setMode(AbstractMode runMode) {
if (mode != null) {
throw new IllegalStateException("Mode was already initialized for GlobalReadOnlyScope. Must be initialized only once.");
}
mode = runMode;
}

/**
* TEST usage only
*/
public static synchronized void clear(){
instance = null;
mode = null;
}

public final Configuration conf;
Expand All @@ -42,7 +60,8 @@ public static synchronized void clear(){
public final Map<String, Integer> adaptorReverse;

public GlobalReadOnlyScope(Configuration conf, Map<String, Integer> chrLengths, String sample, String samplem,
String ampliconBasedCalling, Map<String, Integer> adaptorForward, Map<String, Integer> adaptorReverse) {
String ampliconBasedCalling, Map<String, Integer> adaptorForward,
Map<String, Integer> adaptorReverse) {
this.conf = conf;
this.chrLengths = chrLengths;
this.sample = sample;
Expand Down
40 changes: 34 additions & 6 deletions src/main/java/com/astrazeneca/vardict/modes/AbstractMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,31 @@ public AbstractMode(List<List<Region>> segments, ReferenceResource referenceReso
* @param executor current Executor for parallel/single mode
* @return object contains map of aligned variants
*/
public static CompletableFuture<Scope<AlignedVarsData>> pipeline(Scope<InitialData> initialDataScope,
public CompletableFuture<Scope<AlignedVarsData>> pipeline(Scope<InitialData> initialDataScope,
Executor executor) {

return CompletableFuture.supplyAsync(
() -> new SAMFileParser().process(initialDataScope), executor)
.thenApply(new CigarParser(false)::process)
.thenApply(new VariationRealigner()::process)
.thenApply(new StructuralVariantsProcessor()::process)
.thenApply(new ToVarsBuilder()::process);
.thenApply(new ToVarsBuilder()::process)
.exceptionally(ex -> {
stopVardictWithException(initialDataScope.region, ex);
throw new RuntimeException(ex);
});
}

/**
* Method for stopping VarDict with Exception. Number of exceptions set by Configuration.MAX_EXCEPTION_COUNT can be
* skipped, but after this limit program must stop even in parallel mode (where Executor may hang on Exception).
* @param region region where Exception occurs
* @param ex initial exception
*/
void stopVardictWithException(Region region, Throwable ex) {
System.err.println("Critical exception occurs on region: "
+ region.chr +":" + region.start + "-" + region.end + ", program will be stopped.");
ex.printStackTrace();
System.exit(1);
}

/**
Expand All @@ -60,15 +76,27 @@ public static CompletableFuture<Scope<AlignedVarsData>> pipeline(Scope<InitialDa
* @param executor current Executor for parallel/single mode
* @return object contains variation data (updated maps of variations and softclips).
*/
public static CompletableFuture<Scope<VariationData>> partialPipeline(Scope<InitialData> currentScope,
public CompletableFuture<Scope<VariationData>> partialPipeline(Scope<InitialData> currentScope,
Executor executor) {


return CompletableFuture.supplyAsync(
() -> new SAMFileParser().process(currentScope), executor)
.thenApply(new CigarParser(true)::process);
}

/**
* Starts pipeline for splicing mode: output only information about splice counts (N in CIGAR reads).
* @param currentScope current data for pipeline. contains data about BAM, region, reference and already
* filled maps of variations and softclips
* @param executor current Executor for parallel/single mode
* @return empty object for variation data.
*/
public CompletableFuture<Scope<VariationData>> splicingPipeline(Scope<InitialData> currentScope,
Executor executor) {
return CompletableFuture.supplyAsync(
() -> new SAMFileParser().process(currentScope), executor)
.thenApply(new CigarParser(false)::process);
}

public abstract void notParallel();

public void parallel() {
Expand Down
23 changes: 14 additions & 9 deletions src/main/java/com/astrazeneca/vardict/modes/AmpliconMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,20 @@ void produceTasks() throws InterruptedException, ExecutionException {
}

Region lastRegion = currentRegion;
CompletableFuture<OutputStream> processAmpliconOutput = CompletableFuture.supplyAsync(() -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream out = new PrintStream(baos);
VariantPrinter variantPrinter = VariantPrinter.createPrinter(instance().printerTypeOut);
variantPrinter.setOut(out);
new AmpliconPostProcessModule().process(lastRegion, vars, pos, splice, variantPrinter);
out.close();
return baos;
}, executor);
CompletableFuture<OutputStream> processAmpliconOutput = CompletableFuture
.supplyAsync(() -> {
OutputStream baos = new ByteArrayOutputStream();
PrintStream out = new PrintStream(baos);
VariantPrinter variantPrinter = VariantPrinter.createPrinter(instance().printerTypeOut);
variantPrinter.setOut(out);
new AmpliconPostProcessModule().process(lastRegion, vars, pos, splice, variantPrinter);
out.close();
return baos;
}, executor)
.exceptionally(ex -> {
stopVardictWithException(lastRegion, ex);
throw new RuntimeException(ex);
});
toPrint.add(processAmpliconOutput);
}
toPrint.put(AbstractMode.LAST_SIGNAL_FUTURE);
Expand Down
42 changes: 23 additions & 19 deletions src/main/java/com/astrazeneca/vardict/modes/SimpleMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,11 @@ public void notParallel() {

for (List<Region> list : segments) {
for (Region region : list) {
simple(region, variantPrinter);
processBamInPipeline(region, variantPrinter);
}
}
}

/**
* For each segment and region starts the pipeline.
* @param region region from BED file/-R option to process.
* @param out variant printer used for output
*/
private void simple(Region region, VariantPrinter out) {
Reference ref = referenceResource.getReference(region);
Scope<InitialData> initialScope = new Scope<>(instance().conf.bam.getBam1(), region,
ref, referenceResource, 0, new HashSet<>(),
out, new InitialData());

CompletableFuture<Scope<AlignedVarsData>> pipeline = pipeline(initialScope, new DirectThreadExecutor());
CompletableFuture<Void> simpleProcessOutput = pipeline.thenAccept(new SimplePostProcessModule(out));
simpleProcessOutput.join();
}

/**
* In parallel mode workers are created for each region and are processed in parallel.
*/
Expand Down Expand Up @@ -99,13 +83,33 @@ public OutputStream call() {
PrintStream out = new PrintStream(baos);
VariantPrinter variantPrinter = VariantPrinter.createPrinter(instance().printerTypeOut);
variantPrinter.setOut(out);
simple(region, variantPrinter);

processBamInPipeline(region, variantPrinter);
out.close();
return baos;
}
}

/**
* For each segment and region starts the pipeline.
* @param region region from BED file/-R option to process.
* @param out variant printer used for output
*/
private void processBamInPipeline(Region region, VariantPrinter out) {
Reference ref = referenceResource.getReference(region);
Scope<InitialData> initialScope = new Scope<>(instance().conf.bam.getBam1(), region,
ref, referenceResource, 0, new HashSet<>(),
out, new InitialData());

CompletableFuture<Scope<AlignedVarsData>> pipeline = pipeline(initialScope, new DirectThreadExecutor());
CompletableFuture<Void> simpleProcessOutput = pipeline
.thenAccept(new SimplePostProcessModule(out))
.exceptionally(ex -> {
stopVardictWithException(region, ex);
throw new RuntimeException(ex);
});
simpleProcessOutput.join();
}

@Override
public void printHeader() {
if (instance().conf.printHeader) {
Expand Down
80 changes: 32 additions & 48 deletions src/main/java/com/astrazeneca/vardict/modes/SomaticMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,9 @@ public void notParallel() {

for (List<Region> list : segments) {
for (Region region : list) {

final Set<String> splice = new ConcurrentHashSet<>();
Reference ref = referenceResource.getReference(region);
Scope<InitialData> initialScope1 = new Scope<>(
instance().conf.bam.getBam1(),
region, ref, referenceResource,
0,
splice,
variantPrinter,
new InitialData());
CompletableFuture<Scope<AlignedVarsData>> bam1VariationsFuture = pipeline(initialScope1, new DirectThreadExecutor());

Scope<AlignedVarsData> bam1Variations = bam1VariationsFuture.join();

Scope<InitialData> initialScope2 = new Scope<>(
instance().conf.bam.getBam2(),
region, ref, referenceResource,
bam1Variations.maxReadLength,
splice,
variantPrinter,
new InitialData());
CompletableFuture<Scope<AlignedVarsData>> bam2VariationFuture = pipeline(initialScope2, new DirectThreadExecutor());

CompletableFuture<Void> somaticProcessOutput = bam2VariationFuture.thenAcceptBoth(bam1VariationsFuture,
new SomaticPostProcessModule(referenceResource, variantPrinter));

somaticProcessOutput.join();
processBothBamsInPipeline(variantPrinter, region, splice, ref);
}
}
}
Expand Down Expand Up @@ -117,35 +93,43 @@ public OutputStream call() {
PrintStream out = new PrintStream(baos);
VariantPrinter variantPrinter = VariantPrinter.createPrinter(instance().printerTypeOut);
variantPrinter.setOut(out);

Scope<InitialData> initialScope1 = new Scope<>(
instance().conf.bam.getBam1(),
region, ref, referenceResource,
0,
splice,
variantPrinter,
new InitialData());

CompletableFuture<Scope<AlignedVarsData>> bam1VariationFuture = pipeline(initialScope1, new DirectThreadExecutor());

Scope<InitialData> initialScope2 = new Scope<>(
instance().conf.bam.getBam2(),
region, ref, referenceResource,
0,
splice,
variantPrinter,
new InitialData());
CompletableFuture<Scope<AlignedVarsData>> bam2VariationFuture = pipeline(initialScope2, new DirectThreadExecutor());

CompletableFuture<Void> somaticProcessOutput = bam2VariationFuture.thenAcceptBoth(bam1VariationFuture,
new SomaticPostProcessModule(referenceResource, variantPrinter));
somaticProcessOutput.join();
processBothBamsInPipeline(variantPrinter, region, splice, ref);
out.close();

return baos;
}
}

private void processBothBamsInPipeline(VariantPrinter variantPrinter, Region region, Set<String> splice, Reference ref) {
Scope<InitialData> initialScope1 = new Scope<>(
instance().conf.bam.getBam1(),
region, ref, referenceResource,
0,
splice,
variantPrinter,
new InitialData());
CompletableFuture<Scope<AlignedVarsData>> bam1VariationsFuture = pipeline(initialScope1, new DirectThreadExecutor());

Scope<AlignedVarsData> bam1Variations = bam1VariationsFuture.join();

Scope<InitialData> initialScope2 = new Scope<>(
instance().conf.bam.getBam2(),
region, ref, referenceResource,
bam1Variations.maxReadLength,
splice,
variantPrinter,
new InitialData());
CompletableFuture<Scope<AlignedVarsData>> bam2VariationFuture = pipeline(initialScope2, new DirectThreadExecutor());

CompletableFuture<Void> somaticProcessOutput = bam2VariationFuture
.thenAcceptBoth(bam1VariationsFuture, new SomaticPostProcessModule(referenceResource, variantPrinter))
.exceptionally(ex -> {
stopVardictWithException(region, ex);
throw new RuntimeException(ex);
});
somaticProcessOutput.join();
}

@Override
public void printHeader() {
if (instance().conf.printHeader) {
Expand Down
Loading

0 comments on commit e2995c9

Please sign in to comment.