diff --git a/src/main/java/com/milaboratory/migec2/core/correct/CorrectorParameters.java b/src/main/java/com/milaboratory/migec2/core/correct/CorrectorParameters.java index 77c40d9..2bcae95 100644 --- a/src/main/java/com/milaboratory/migec2/core/correct/CorrectorParameters.java +++ b/src/main/java/com/milaboratory/migec2/core/correct/CorrectorParameters.java @@ -16,7 +16,7 @@ public final class CorrectorParameters implements ParameterSet { private final double maxBasePairsMaskedRatio; - public static CorrectorParameters DEFAULT = new CorrectorParameters(0.1, false, 10000.0, + public static CorrectorParameters DEFAULT = new CorrectorParameters(0.5, false, 10000.0, 5, 10, Util.PH33_LOW_QUAL, 0.5); public CorrectorParameters(double classifierProbabilityThreshold, diff --git a/src/main/java/com/milaboratory/migec2/core/io/misc/ReadInfo.java b/src/main/java/com/milaboratory/migec2/core/io/misc/ReadInfo.java index 0980fcb..b4276b4 100644 --- a/src/main/java/com/milaboratory/migec2/core/io/misc/ReadInfo.java +++ b/src/main/java/com/milaboratory/migec2/core/io/misc/ReadInfo.java @@ -15,15 +15,16 @@ */ package com.milaboratory.migec2.core.io.misc; +import com.milaboratory.core.sequencing.read.SequencingRead; import com.milaboratory.migec2.preproc.demultiplex.entity.CheckoutResult; -public class ReadInfo { - private final long id; +public class ReadInfo { + private final ReadType read; private final boolean flipMe, rcMe; private final CheckoutResult checkoutResult; - public ReadInfo(long id, boolean flipMe, boolean rcMe, CheckoutResult checkoutResult) { - this.id = id; + public ReadInfo(ReadType read, boolean flipMe, boolean rcMe, CheckoutResult checkoutResult) { + this.read = read; this.flipMe = flipMe; this.rcMe = rcMe; this.checkoutResult = checkoutResult; @@ -33,8 +34,8 @@ public CheckoutResult getCheckoutResult() { return checkoutResult; } - public long id() { - return id; + public ReadType getRead() { + return read; } public boolean flipMe() { diff --git a/src/main/java/com/milaboratory/migec2/core/io/misc/index/UmiIndexer.java b/src/main/java/com/milaboratory/migec2/core/io/misc/index/UmiIndexer.java index ba51dfe..d4f66cf 100644 --- a/src/main/java/com/milaboratory/migec2/core/io/misc/index/UmiIndexer.java +++ b/src/main/java/com/milaboratory/migec2/core/io/misc/index/UmiIndexer.java @@ -27,7 +27,7 @@ public ProcessorResultWrapper process(SequencingRead sequencingRea String sampleName = result.getSampleName(); NucleotideSequence umi = result.getUmi(); - ReadInfo readInfo = new ReadInfo(sequencingRead.id(), + ReadInfo readInfo = new ReadInfo(sequencingRead, masterFirst[result.getSampleId()] != result.masterFirst(), result.foundInRC(), result); diff --git a/src/main/java/com/milaboratory/migec2/core/io/readers/MigReader.java b/src/main/java/com/milaboratory/migec2/core/io/readers/MigReader.java index d98ebc5..f89af94 100644 --- a/src/main/java/com/milaboratory/migec2/core/io/readers/MigReader.java +++ b/src/main/java/com/milaboratory/migec2/core/io/readers/MigReader.java @@ -1,6 +1,7 @@ package com.milaboratory.migec2.core.io.readers; import cc.redberry.pipe.OutputPort; +import cc.redberry.pipe.OutputPortCloseable; import cc.redberry.pipe.blocks.Merger; import cc.redberry.pipe.blocks.ParallelProcessor; import cc.redberry.pipe.util.CountLimitingOutputPort; @@ -20,7 +21,7 @@ import java.util.*; -public abstract class MigReader implements OutputPort { +public abstract class MigReader implements OutputPort { private static final boolean ENABLE_BUFFERING = false; protected int sizeThreshold; @@ -54,7 +55,7 @@ protected MigReader(MigReaderParameters migReaderParameters, String sampleName) this.umiIndexer = new UmiIndexer(checkoutProcessor, migReaderParameters.getUmiQualThreshold()); } - protected void buildUmiIndex(OutputPort input) + protected void buildUmiIndex(OutputPortCloseable input) throws InterruptedException { // Set limit if required @@ -108,7 +109,7 @@ public void run() { umiIndexBySample.put(sampleName, new HashMap>()); } - // Take results, update histogram and index (not parallel) + // Take results, update histogram and index (single thread) ProcessorResultWrapper result; while ((result = indexingResults.take()) != null) { if (result.hasResult()) { @@ -141,11 +142,11 @@ protected boolean checkUmiMismatch(String sampleName, NucleotideSequence umi) { !umiHistogramBySample.get(sampleName).isMismatch(umi, minMismatchRatio); } - public T take() { + public MigType take() { return take(currentSample, sizeThreshold); } - protected abstract T take(String sampleName, int sizeThreshold); + protected abstract MigType take(String sampleName, int sizeThreshold); public List getSampleNames() { return sampleNames; diff --git a/src/main/java/com/milaboratory/migec2/core/io/readers/PMigReader.java b/src/main/java/com/milaboratory/migec2/core/io/readers/PMigReader.java index 32a877e..78a4630 100644 --- a/src/main/java/com/milaboratory/migec2/core/io/readers/PMigReader.java +++ b/src/main/java/com/milaboratory/migec2/core/io/readers/PMigReader.java @@ -18,8 +18,9 @@ import cc.redberry.pipe.OutputPortCloseable; import com.milaboratory.core.sequence.nucleotide.NucleotideSequence; import com.milaboratory.core.sequence.quality.QualityFormat; -import com.milaboratory.core.sequencing.io.fastq.SFastqReader; +import com.milaboratory.core.sequencing.io.fastq.PFastqReader; import com.milaboratory.core.sequencing.io.fastq.SRandomAccessFastqReader; +import com.milaboratory.core.sequencing.read.PSequencingRead; import com.milaboratory.core.sequencing.read.PSequencingReadImpl; import com.milaboratory.core.sequencing.read.SSequencingRead; import com.milaboratory.core.sequencing.read.SequencingRead; @@ -32,8 +33,6 @@ import com.milaboratory.migec2.preproc.demultiplex.processor.PCheckoutProcessor; import com.milaboratory.migec2.preproc.misc.ReadOverlapper; import com.milaboratory.migec2.util.Util; -import com.milaboratory.util.CompressionType; -import com.milaboratory.util.io.RecordIndexer; import java.io.File; import java.io.IOException; @@ -43,7 +42,6 @@ import java.util.Map; public final class PMigReader extends MigReader { - private SRandomAccessFastqReader rar1, rar2; private final ReadOverlapper readOverlapper; private final boolean performIlluminaRC; @@ -84,25 +82,14 @@ public PMigReader(File file1, File file2, String sampleName, private void preprocess(File file1, File file2) throws IOException, InterruptedException { // Only work with uncompressed files - final SFastqReader reader1 = new SFastqReader(file1, QualityFormat.Phred33, CompressionType.None), - reader2 = new SFastqReader(file2, QualityFormat.Phred33, CompressionType.None); - - // Creating indexer - final RecordIndexer indexer1 = new RecordIndexer(1L), - indexer2 = new RecordIndexer(1L); - reader1.attachIndexer(indexer1); - reader2.attachIndexer(indexer2); + final PFastqReader reader = new PFastqReader(file1, file2, QualityFormat.Phred33); // Build UMI index - buildUmiIndex(new PairedReaderWrapper(reader1, reader2)); - - // Creating random access fastq reader - rar1 = new SRandomAccessFastqReader(indexer1.createIndex(), file1); - rar2 = new SRandomAccessFastqReader(indexer2.createIndex(), file2); + buildUmiIndex(new PairedReaderWrapper(reader)); } @Override - protected PMig take(String sampleName, int sizeThreshold) { + protected synchronized PMig take(String sampleName, int sizeThreshold) { Iterator>> iterator = iteratorMap.get(sampleName); while (iterator.hasNext()) { Map.Entry> entry = iterator.next(); @@ -111,11 +98,9 @@ protected PMig take(String sampleName, int sizeThreshold) { readList2 = new ArrayList<>(); for (ReadInfo readInfo : entry.getValue()) { - SSequencingRead read1, read2; - rar1.seek(readInfo.id()); - read1 = rar1.readNext(); - rar2.seek(readInfo.id()); - read2 = rar2.readNext(); + PSequencingRead pRead = (PSequencingRead) readInfo.getRead(); + SSequencingRead read1 = pRead.getSingleRead(0), + read2 = pRead.getSingleRead(1); // Barcode was found in RC version of entire read pair // bring back to strand specified in checkout processor barcode @@ -191,27 +176,22 @@ protected PMig take(String sampleName, int sizeThreshold) { } private class PairedReaderWrapper implements OutputPortCloseable { - private final SFastqReader[] readers = new SFastqReader[2]; + private final PFastqReader reader; - public PairedReaderWrapper(SFastqReader reader1, SFastqReader reader2) { - readers[0] = reader1; - readers[1] = reader2; + public PairedReaderWrapper(PFastqReader reader) { + this.reader = reader; } @Override public void close() { - readers[0].close(); - readers[1].close(); + reader.close(); } @Override public SequencingRead take() { - synchronized (readers) { - SSequencingRead read1 = readers[0].take(); - if (read1 == null) - return null; - else - return new PSequencingReadImpl(read1, readers[1].take()); + // allows working with disable buffering + synchronized (reader) { + return reader.take(); } } } diff --git a/src/main/java/com/milaboratory/migec2/core/io/readers/SMigReader.java b/src/main/java/com/milaboratory/migec2/core/io/readers/SMigReader.java index 6b514cd..584b344 100644 --- a/src/main/java/com/milaboratory/migec2/core/io/readers/SMigReader.java +++ b/src/main/java/com/milaboratory/migec2/core/io/readers/SMigReader.java @@ -17,9 +17,7 @@ import cc.redberry.pipe.OutputPortCloseable; import com.milaboratory.core.sequence.nucleotide.NucleotideSequence; -import com.milaboratory.core.sequence.quality.QualityFormat; import com.milaboratory.core.sequencing.io.fastq.SFastqReader; -import com.milaboratory.core.sequencing.io.fastq.SRandomAccessFastqReader; import com.milaboratory.core.sequencing.read.SSequencingRead; import com.milaboratory.core.sequencing.read.SequencingRead; import com.milaboratory.migec2.core.io.entity.SMig; @@ -27,8 +25,6 @@ import com.milaboratory.migec2.core.io.misc.ReadInfo; import com.milaboratory.migec2.preproc.demultiplex.processor.SCheckoutProcessor; import com.milaboratory.migec2.util.Util; -import com.milaboratory.util.CompressionType; -import com.milaboratory.util.io.RecordIndexer; import java.io.File; import java.io.IOException; @@ -38,13 +34,10 @@ import java.util.Map; public final class SMigReader extends MigReader { - private SRandomAccessFastqReader rar; - public SMigReader(File file, SCheckoutProcessor checkoutProcessor) throws Exception { this(file, checkoutProcessor, MigReaderParameters.DEFAULT); } - //todo: store chekout and from stored checkout public SMigReader(File file, SCheckoutProcessor checkoutProcessor, MigReaderParameters migReaderParameters) throws IOException, InterruptedException { super(migReaderParameters, checkoutProcessor); @@ -64,30 +57,25 @@ public SMigReader(File file, String sampleName, MigReaderParameters migReaderPar private void preprocess(File file) throws IOException, InterruptedException { // Only work with uncompressed files - final SFastqReader reader = new SFastqReader(file, QualityFormat.Phred33, CompressionType.None); - - // Creating indexer - final RecordIndexer indexer = new RecordIndexer(1L); - reader.attachIndexer(indexer); + final SFastqReader reader = new SFastqReader(file); // Build UMI index buildUmiIndex(new SingleReaderWrapper(reader)); - - // Creating random access fastq reader - rar = new SRandomAccessFastqReader(indexer.createIndex(), file); } @Override - protected SMig take(String sampleName, int sizeThreshold) { + protected synchronized SMig take(String sampleName, int sizeThreshold) { Iterator>> iterator = iteratorMap.get(sampleName); while (iterator.hasNext()) { Map.Entry> entry = iterator.next(); if (entry.getValue().size() >= sizeThreshold && checkUmiMismatch(sampleName, entry.getKey())) { List readList = new ArrayList<>(); + // todo: handle adapter trimming case + for (ReadInfo readInfo : entry.getValue()) { - rar.seek(readInfo.id()); - readList.add(readInfo.rcMe() ? Util.rc(rar.readNext()) : rar.readNext()); + SSequencingRead read = (SSequencingRead) readInfo.getRead(); + readList.add(readInfo.rcMe() ? Util.rc(read) : read); } return new SMig(readList, entry.getKey()); @@ -110,6 +98,7 @@ public void close() { @Override public SequencingRead take() { + // allows working with disable buffering synchronized (reader) { return reader.take(); }