Skip to content

Commit

Permalink
Just store all reads in the memory, no random access anymore -> drama…
Browse files Browse the repository at this point in the history
…tic speedup. Close #6
  • Loading branch information
mikessh committed Feb 16, 2015
1 parent 05de98a commit 8466f3b
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public final class CorrectorParameters implements ParameterSet {

private final double maxBasePairsMaskedRatio;

public static CorrectorParameters DEFAULT = new CorrectorParameters(0.1, false, 10000.0,
public static CorrectorParameters DEFAULT = new CorrectorParameters(0.5, false, 10000.0,
5, 10, Util.PH33_LOW_QUAL, 0.5);

public CorrectorParameters(double classifierProbabilityThreshold,
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/com/milaboratory/migec2/core/io/misc/ReadInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
*/
package com.milaboratory.migec2.core.io.misc;

import com.milaboratory.core.sequencing.read.SequencingRead;
import com.milaboratory.migec2.preproc.demultiplex.entity.CheckoutResult;

public class ReadInfo {
private final long id;
public class ReadInfo<ReadType extends SequencingRead> {
private final ReadType read;
private final boolean flipMe, rcMe;
private final CheckoutResult checkoutResult;

public ReadInfo(long id, boolean flipMe, boolean rcMe, CheckoutResult checkoutResult) {
this.id = id;
public ReadInfo(ReadType read, boolean flipMe, boolean rcMe, CheckoutResult checkoutResult) {
this.read = read;
this.flipMe = flipMe;
this.rcMe = rcMe;
this.checkoutResult = checkoutResult;
Expand All @@ -33,8 +34,8 @@ public CheckoutResult getCheckoutResult() {
return checkoutResult;
}

public long id() {
return id;
public ReadType getRead() {
return read;
}

public boolean flipMe() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public ProcessorResultWrapper<IndexingInfo> process(SequencingRead sequencingRea
String sampleName = result.getSampleName();
NucleotideSequence umi = result.getUmi();

ReadInfo readInfo = new ReadInfo(sequencingRead.id(),
ReadInfo readInfo = new ReadInfo(sequencingRead,
masterFirst[result.getSampleId()] != result.masterFirst(),
result.foundInRC(), result);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.milaboratory.migec2.core.io.readers;

import cc.redberry.pipe.OutputPort;
import cc.redberry.pipe.OutputPortCloseable;
import cc.redberry.pipe.blocks.Merger;
import cc.redberry.pipe.blocks.ParallelProcessor;
import cc.redberry.pipe.util.CountLimitingOutputPort;
Expand All @@ -20,7 +21,7 @@

import java.util.*;

public abstract class MigReader<T extends Mig> implements OutputPort<T> {
public abstract class MigReader<MigType extends Mig> implements OutputPort<MigType> {
private static final boolean ENABLE_BUFFERING = false;

protected int sizeThreshold;
Expand Down Expand Up @@ -54,7 +55,7 @@ protected MigReader(MigReaderParameters migReaderParameters, String sampleName)
this.umiIndexer = new UmiIndexer(checkoutProcessor, migReaderParameters.getUmiQualThreshold());
}

protected void buildUmiIndex(OutputPort<SequencingRead> input)
protected void buildUmiIndex(OutputPortCloseable<SequencingRead> input)
throws InterruptedException {

// Set limit if required
Expand Down Expand Up @@ -108,7 +109,7 @@ public void run() {
umiIndexBySample.put(sampleName, new HashMap<NucleotideSequence, List<ReadInfo>>());
}

// Take results, update histogram and index (not parallel)
// Take results, update histogram and index (single thread)
ProcessorResultWrapper<IndexingInfo> result;
while ((result = indexingResults.take()) != null) {
if (result.hasResult()) {
Expand Down Expand Up @@ -141,11 +142,11 @@ protected boolean checkUmiMismatch(String sampleName, NucleotideSequence umi) {
!umiHistogramBySample.get(sampleName).isMismatch(umi, minMismatchRatio);
}

public T take() {
public MigType take() {
return take(currentSample, sizeThreshold);
}

protected abstract T take(String sampleName, int sizeThreshold);
protected abstract MigType take(String sampleName, int sizeThreshold);

public List<String> getSampleNames() {
return sampleNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import cc.redberry.pipe.OutputPortCloseable;
import com.milaboratory.core.sequence.nucleotide.NucleotideSequence;
import com.milaboratory.core.sequence.quality.QualityFormat;
import com.milaboratory.core.sequencing.io.fastq.SFastqReader;
import com.milaboratory.core.sequencing.io.fastq.PFastqReader;
import com.milaboratory.core.sequencing.io.fastq.SRandomAccessFastqReader;
import com.milaboratory.core.sequencing.read.PSequencingRead;
import com.milaboratory.core.sequencing.read.PSequencingReadImpl;
import com.milaboratory.core.sequencing.read.SSequencingRead;
import com.milaboratory.core.sequencing.read.SequencingRead;
Expand All @@ -32,8 +33,6 @@
import com.milaboratory.migec2.preproc.demultiplex.processor.PCheckoutProcessor;
import com.milaboratory.migec2.preproc.misc.ReadOverlapper;
import com.milaboratory.migec2.util.Util;
import com.milaboratory.util.CompressionType;
import com.milaboratory.util.io.RecordIndexer;

import java.io.File;
import java.io.IOException;
Expand All @@ -43,7 +42,6 @@
import java.util.Map;

public final class PMigReader extends MigReader<PMig> {
private SRandomAccessFastqReader rar1, rar2;
private final ReadOverlapper readOverlapper;
private final boolean performIlluminaRC;

Expand Down Expand Up @@ -84,25 +82,14 @@ public PMigReader(File file1, File file2, String sampleName,

private void preprocess(File file1, File file2) throws IOException, InterruptedException {
// Only work with uncompressed files
final SFastqReader reader1 = new SFastqReader(file1, QualityFormat.Phred33, CompressionType.None),
reader2 = new SFastqReader(file2, QualityFormat.Phred33, CompressionType.None);

// Creating indexer
final RecordIndexer indexer1 = new RecordIndexer(1L),
indexer2 = new RecordIndexer(1L);
reader1.attachIndexer(indexer1);
reader2.attachIndexer(indexer2);
final PFastqReader reader = new PFastqReader(file1, file2, QualityFormat.Phred33);

// Build UMI index
buildUmiIndex(new PairedReaderWrapper(reader1, reader2));

// Creating random access fastq reader
rar1 = new SRandomAccessFastqReader(indexer1.createIndex(), file1);
rar2 = new SRandomAccessFastqReader(indexer2.createIndex(), file2);
buildUmiIndex(new PairedReaderWrapper(reader));
}

@Override
protected PMig take(String sampleName, int sizeThreshold) {
protected synchronized PMig take(String sampleName, int sizeThreshold) {
Iterator<Map.Entry<NucleotideSequence, List<ReadInfo>>> iterator = iteratorMap.get(sampleName);
while (iterator.hasNext()) {
Map.Entry<NucleotideSequence, List<ReadInfo>> entry = iterator.next();
Expand All @@ -111,11 +98,9 @@ protected PMig take(String sampleName, int sizeThreshold) {
readList2 = new ArrayList<>();

for (ReadInfo readInfo : entry.getValue()) {
SSequencingRead read1, read2;
rar1.seek(readInfo.id());
read1 = rar1.readNext();
rar2.seek(readInfo.id());
read2 = rar2.readNext();
PSequencingRead pRead = (PSequencingRead) readInfo.getRead();
SSequencingRead read1 = pRead.getSingleRead(0),
read2 = pRead.getSingleRead(1);

// Barcode was found in RC version of entire read pair
// bring back to strand specified in checkout processor barcode
Expand Down Expand Up @@ -191,27 +176,22 @@ protected PMig take(String sampleName, int sizeThreshold) {
}

private class PairedReaderWrapper implements OutputPortCloseable<SequencingRead> {
private final SFastqReader[] readers = new SFastqReader[2];
private final PFastqReader reader;

public PairedReaderWrapper(SFastqReader reader1, SFastqReader reader2) {
readers[0] = reader1;
readers[1] = reader2;
public PairedReaderWrapper(PFastqReader reader) {
this.reader = reader;
}

@Override
public void close() {
readers[0].close();
readers[1].close();
reader.close();
}

@Override
public SequencingRead take() {
synchronized (readers) {
SSequencingRead read1 = readers[0].take();
if (read1 == null)
return null;
else
return new PSequencingReadImpl(read1, readers[1].take());
// allows working with disable buffering
synchronized (reader) {
return reader.take();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@

import cc.redberry.pipe.OutputPortCloseable;
import com.milaboratory.core.sequence.nucleotide.NucleotideSequence;
import com.milaboratory.core.sequence.quality.QualityFormat;
import com.milaboratory.core.sequencing.io.fastq.SFastqReader;
import com.milaboratory.core.sequencing.io.fastq.SRandomAccessFastqReader;
import com.milaboratory.core.sequencing.read.SSequencingRead;
import com.milaboratory.core.sequencing.read.SequencingRead;
import com.milaboratory.migec2.core.io.entity.SMig;
import com.milaboratory.migec2.core.io.misc.MigReaderParameters;
import com.milaboratory.migec2.core.io.misc.ReadInfo;
import com.milaboratory.migec2.preproc.demultiplex.processor.SCheckoutProcessor;
import com.milaboratory.migec2.util.Util;
import com.milaboratory.util.CompressionType;
import com.milaboratory.util.io.RecordIndexer;

import java.io.File;
import java.io.IOException;
Expand All @@ -38,13 +34,10 @@
import java.util.Map;

public final class SMigReader extends MigReader<SMig> {
private SRandomAccessFastqReader rar;

public SMigReader(File file, SCheckoutProcessor checkoutProcessor) throws Exception {
this(file, checkoutProcessor, MigReaderParameters.DEFAULT);
}

//todo: store chekout and from stored checkout
public SMigReader(File file, SCheckoutProcessor checkoutProcessor, MigReaderParameters migReaderParameters)
throws IOException, InterruptedException {
super(migReaderParameters, checkoutProcessor);
Expand All @@ -64,30 +57,25 @@ public SMigReader(File file, String sampleName, MigReaderParameters migReaderPar

private void preprocess(File file) throws IOException, InterruptedException {
// Only work with uncompressed files
final SFastqReader reader = new SFastqReader(file, QualityFormat.Phred33, CompressionType.None);

// Creating indexer
final RecordIndexer indexer = new RecordIndexer(1L);
reader.attachIndexer(indexer);
final SFastqReader reader = new SFastqReader(file);

// Build UMI index
buildUmiIndex(new SingleReaderWrapper(reader));

// Creating random access fastq reader
rar = new SRandomAccessFastqReader(indexer.createIndex(), file);
}

@Override
protected SMig take(String sampleName, int sizeThreshold) {
protected synchronized SMig take(String sampleName, int sizeThreshold) {
Iterator<Map.Entry<NucleotideSequence, List<ReadInfo>>> iterator = iteratorMap.get(sampleName);
while (iterator.hasNext()) {
Map.Entry<NucleotideSequence, List<ReadInfo>> entry = iterator.next();
if (entry.getValue().size() >= sizeThreshold && checkUmiMismatch(sampleName, entry.getKey())) {
List<SSequencingRead> readList = new ArrayList<>();

// todo: handle adapter trimming case

for (ReadInfo readInfo : entry.getValue()) {
rar.seek(readInfo.id());
readList.add(readInfo.rcMe() ? Util.rc(rar.readNext()) : rar.readNext());
SSequencingRead read = (SSequencingRead) readInfo.getRead();
readList.add(readInfo.rcMe() ? Util.rc(read) : read);
}

return new SMig(readList, entry.getKey());
Expand All @@ -110,6 +98,7 @@ public void close() {

@Override
public SequencingRead take() {
// allows working with disable buffering
synchronized (reader) {
return reader.take();
}
Expand Down

0 comments on commit 8466f3b

Please sign in to comment.