From e2bf8c4fa8c11211219077f903440f00ba668054 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Thu, 26 Oct 2023 09:39:54 -0700 Subject: [PATCH] Add direct executor in non-concurrent case This allows us to perform the operations on thread as opposed to submitting to a single-threaded executor. Additionally, breaks apart the Deephaven benchmarks into concurrent and non-concurrent versions. --- .../datetimecol/DateTimeColumnBenchmark.java | 9 +++++- .../DateTimeColumnParserDeephaven.java | 4 ++- .../doublecol/DoubleColumnBenchmark.java | 9 +++++- .../DoubleColumnParserDeephaven.java | 4 ++- .../benchmark/intcol/IntColumnBenchmark.java | 9 +++++- .../intcol/IntColumnParserDeephaven.java | 4 ++- .../LargeNumericOnlyBenchmark.java | 8 ++++- .../LargeNumericOnlyDeephaven.java | 4 +-- .../largetable/LargeTableBenchmark.java | 9 ++++-- .../largetable/LargeTableDeephaven.java | 4 +-- .../stringcol/StringColumnBenchmark.java | 9 +++++- .../StringColumnParserDeephaven.java | 4 ++- .../io/deephaven/csv/reading/CsvReader.java | 30 ++++++++++++++----- 13 files changed, 84 insertions(+), 23 deletions(-) diff --git a/src/jmh/java/io/deephaven/csv/benchmark/datetimecol/DateTimeColumnBenchmark.java b/src/jmh/java/io/deephaven/csv/benchmark/datetimecol/DateTimeColumnBenchmark.java index 82696f11..c4a5eeef 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/datetimecol/DateTimeColumnBenchmark.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/datetimecol/DateTimeColumnBenchmark.java @@ -56,11 +56,18 @@ public static class ReusableStorage { public final long[][] output = Util.makeArray(ROWS, COLS, long[]::new, long[][]::new); } + @Benchmark + @OperationsPerInvocation(OPERATIONS) + public BenchmarkResult deephavenSingle(final InputProvider input, final ReusableStorage storage) + throws Exception { + return DateTimeColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, false); + } + @Benchmark @OperationsPerInvocation(OPERATIONS) public BenchmarkResult deephaven(final InputProvider input, final ReusableStorage storage) throws Exception { - return DateTimeColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output); + return DateTimeColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, true); } @Benchmark diff --git a/src/jmh/java/io/deephaven/csv/benchmark/datetimecol/DateTimeColumnParserDeephaven.java b/src/jmh/java/io/deephaven/csv/benchmark/datetimecol/DateTimeColumnParserDeephaven.java index 48594ea4..36d57ed9 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/datetimecol/DateTimeColumnParserDeephaven.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/datetimecol/DateTimeColumnParserDeephaven.java @@ -12,11 +12,13 @@ import java.util.Collections; public final class DateTimeColumnParserDeephaven { - public static BenchmarkResult read(final InputStream in, final long[][] storage) throws Exception { + public static BenchmarkResult read(final InputStream in, final long[][] storage, boolean concurrent) + throws Exception { final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(null, null, null, null, null, storage); final CsvSpecs specs = CsvSpecs.builder() .parsers(Collections.singleton(Parsers.DATETIME)) .hasHeaderRow(true) + .concurrent(concurrent) .build(); final CsvReader.Result result = CsvReader.read(specs, in, sinkFactory); final long[][] data = Arrays.stream(result.columns()) diff --git a/src/jmh/java/io/deephaven/csv/benchmark/doublecol/DoubleColumnBenchmark.java b/src/jmh/java/io/deephaven/csv/benchmark/doublecol/DoubleColumnBenchmark.java index abc15cb2..5a024bea 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/doublecol/DoubleColumnBenchmark.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/doublecol/DoubleColumnBenchmark.java @@ -44,11 +44,18 @@ public static class ReusableStorage { public final double[][] output = Util.makeArray(ROWS, COLS, double[]::new, double[][]::new); } + @Benchmark + @OperationsPerInvocation(OPERATIONS) + public BenchmarkResult deephavenSingle(final InputProvider input, final ReusableStorage storage) + throws Exception { + return DoubleColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, false); + } + @Benchmark @OperationsPerInvocation(OPERATIONS) public BenchmarkResult deephaven(final InputProvider input, final ReusableStorage storage) throws Exception { - return DoubleColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output); + return DoubleColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, true); } @Benchmark diff --git a/src/jmh/java/io/deephaven/csv/benchmark/doublecol/DoubleColumnParserDeephaven.java b/src/jmh/java/io/deephaven/csv/benchmark/doublecol/DoubleColumnParserDeephaven.java index 22a94eec..5526f83e 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/doublecol/DoubleColumnParserDeephaven.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/doublecol/DoubleColumnParserDeephaven.java @@ -12,11 +12,13 @@ import java.util.Collections; public final class DoubleColumnParserDeephaven { - public static BenchmarkResult read(final InputStream in, final double[][] storage) throws Exception { + public static BenchmarkResult read(final InputStream in, final double[][] storage, boolean concurrent) + throws Exception { final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(null, null, null, storage, null, null); final CsvSpecs specs = CsvSpecs.builder() .parsers(Collections.singleton(Parsers.DOUBLE)) .hasHeaderRow(true) + .concurrent(concurrent) .build(); final CsvReader.Result result = CsvReader.read(specs, in, sinkFactory); final double[][] data = Arrays.stream(result.columns()) diff --git a/src/jmh/java/io/deephaven/csv/benchmark/intcol/IntColumnBenchmark.java b/src/jmh/java/io/deephaven/csv/benchmark/intcol/IntColumnBenchmark.java index 8d27a7ea..39246e38 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/intcol/IntColumnBenchmark.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/intcol/IntColumnBenchmark.java @@ -44,10 +44,17 @@ public static class ReusableStorage { public final int[][] output = Util.makeArray(ROWS, COLS, int[]::new, int[][]::new); } + @Benchmark + @OperationsPerInvocation(OPERATIONS) + public BenchmarkResult deephavenSingle(final InputProvider input, final ReusableStorage storage) + throws Exception { + return IntColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, false); + } + @Benchmark @OperationsPerInvocation(OPERATIONS) public BenchmarkResult deephaven(final InputProvider input, final ReusableStorage storage) throws Exception { - return IntColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output); + return IntColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, true); } @Benchmark diff --git a/src/jmh/java/io/deephaven/csv/benchmark/intcol/IntColumnParserDeephaven.java b/src/jmh/java/io/deephaven/csv/benchmark/intcol/IntColumnParserDeephaven.java index 02f82701..63d5e944 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/intcol/IntColumnParserDeephaven.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/intcol/IntColumnParserDeephaven.java @@ -12,11 +12,13 @@ import java.util.Collections; public final class IntColumnParserDeephaven { - public static BenchmarkResult read(final InputStream in, final int[][] storage) throws Exception { + public static BenchmarkResult read(final InputStream in, final int[][] storage, boolean concurrent) + throws Exception { final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(null, storage, null, null, null, null); final CsvSpecs specs = CsvSpecs.builder() .parsers(Collections.singleton(Parsers.INT)) .hasHeaderRow(true) + .concurrent(concurrent) .build(); final CsvReader.Result result = CsvReader.read(specs, in, sinkFactory); final int[][] data = Arrays.stream(result.columns()) diff --git a/src/jmh/java/io/deephaven/csv/benchmark/largenumericonly/LargeNumericOnlyBenchmark.java b/src/jmh/java/io/deephaven/csv/benchmark/largenumericonly/LargeNumericOnlyBenchmark.java index 4544f412..df8c36bc 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/largenumericonly/LargeNumericOnlyBenchmark.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/largenumericonly/LargeNumericOnlyBenchmark.java @@ -116,9 +116,15 @@ public static class ReusableStorage { public final Results results = new Results(ROWS); } + @Benchmark + public void deephavenSingle(InputProvider ip, final ReusableStorage storage, final Blackhole bh) throws Exception { + final Results results = LargeNumericOnlyDeephaven.read(ip.makeStream(), storage.results, false); + bh.consume(results); + } + @Benchmark public void deephaven(InputProvider ip, final ReusableStorage storage, final Blackhole bh) throws Exception { - final Results results = LargeNumericOnlyDeephaven.read(ip.makeStream(), storage.results); + final Results results = LargeNumericOnlyDeephaven.read(ip.makeStream(), storage.results, true); bh.consume(results); } } diff --git a/src/jmh/java/io/deephaven/csv/benchmark/largenumericonly/LargeNumericOnlyDeephaven.java b/src/jmh/java/io/deephaven/csv/benchmark/largenumericonly/LargeNumericOnlyDeephaven.java index 973d93fb..55e47eec 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/largenumericonly/LargeNumericOnlyDeephaven.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/largenumericonly/LargeNumericOnlyDeephaven.java @@ -4,13 +4,12 @@ import io.deephaven.csv.benchmark.util.SinkFactories; import io.deephaven.csv.parsers.Parsers; import io.deephaven.csv.reading.CsvReader; -import io.deephaven.csv.sinks.Sink; import io.deephaven.csv.sinks.SinkFactory; import java.io.InputStream; public class LargeNumericOnlyDeephaven { - public static Results read(final InputStream in, final Results results) throws Exception { + public static Results read(final InputStream in, final Results results, boolean concurrent) throws Exception { final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory( null, null, @@ -21,6 +20,7 @@ public static Results read(final InputStream in, final Results results) throws E final CsvSpecs specs = CsvSpecs.builder() .hasHeaderRow(true) + .concurrent(concurrent) .putParserForIndex(1, Parsers.LONG) .putParserForIndex(2, Parsers.LONG) .putParserForIndex(3, Parsers.LONG) diff --git a/src/jmh/java/io/deephaven/csv/benchmark/largetable/LargeTableBenchmark.java b/src/jmh/java/io/deephaven/csv/benchmark/largetable/LargeTableBenchmark.java index 0363e654..9fba5a5e 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/largetable/LargeTableBenchmark.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/largetable/LargeTableBenchmark.java @@ -1,6 +1,5 @@ package io.deephaven.csv.benchmark.largetable; -import io.deephaven.csv.benchmark.doublecol.DoubleColumnBenchmark; import io.deephaven.csv.benchmark.util.Util; import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; @@ -141,9 +140,15 @@ public static class ReusableStorage { public final Results results = new Results(ROWS); } + @Benchmark + public void deephavenSingle(InputProvider ip, final ReusableStorage storage, final Blackhole bh) throws Exception { + final Results results = LargeTableDeephaven.read(ip.makeStream(), storage.results, false); + bh.consume(results); + } + @Benchmark public void deephaven(InputProvider ip, final ReusableStorage storage, final Blackhole bh) throws Exception { - final Results results = LargeTableDeephaven.read(ip.makeStream(), storage.results); + final Results results = LargeTableDeephaven.read(ip.makeStream(), storage.results, true); bh.consume(results); } diff --git a/src/jmh/java/io/deephaven/csv/benchmark/largetable/LargeTableDeephaven.java b/src/jmh/java/io/deephaven/csv/benchmark/largetable/LargeTableDeephaven.java index 49f7ab22..39ccfe37 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/largetable/LargeTableDeephaven.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/largetable/LargeTableDeephaven.java @@ -4,13 +4,12 @@ import io.deephaven.csv.benchmark.util.SinkFactories; import io.deephaven.csv.parsers.Parsers; import io.deephaven.csv.reading.CsvReader; -import io.deephaven.csv.sinks.Sink; import io.deephaven.csv.sinks.SinkFactory; import java.io.InputStream; public class LargeTableDeephaven { - public static Results read(final InputStream in, final Results results) throws Exception { + public static Results read(final InputStream in, final Results results, boolean concurrent) throws Exception { final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory( new byte[][] {results.boolsAsBytes}, null, @@ -20,6 +19,7 @@ public static Results read(final InputStream in, final Results results) throws E new long[][] {results.timestamps}); final CsvSpecs specs = CsvSpecs.builder() .hasHeaderRow(true) + .concurrent(concurrent) .putParserForIndex(1, Parsers.DATETIME) .putParserForIndex(2, Parsers.STRING) .putParserForIndex(3, Parsers.BOOLEAN) diff --git a/src/jmh/java/io/deephaven/csv/benchmark/stringcol/StringColumnBenchmark.java b/src/jmh/java/io/deephaven/csv/benchmark/stringcol/StringColumnBenchmark.java index 1300dd74..01393eab 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/stringcol/StringColumnBenchmark.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/stringcol/StringColumnBenchmark.java @@ -67,11 +67,18 @@ public String[][] output() { } } + @Benchmark + @OperationsPerInvocation(OPERATIONS) + public BenchmarkResult deephavenSingle(final InputProvider input, final ReusableStorage storage) + throws Exception { + return StringColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output(), false); + } + @Benchmark @OperationsPerInvocation(OPERATIONS) public BenchmarkResult deephaven(final InputProvider input, final ReusableStorage storage) throws Exception { - return StringColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output()); + return StringColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output(), true); } @Benchmark diff --git a/src/jmh/java/io/deephaven/csv/benchmark/stringcol/StringColumnParserDeephaven.java b/src/jmh/java/io/deephaven/csv/benchmark/stringcol/StringColumnParserDeephaven.java index 72be4441..cd48ac8f 100644 --- a/src/jmh/java/io/deephaven/csv/benchmark/stringcol/StringColumnParserDeephaven.java +++ b/src/jmh/java/io/deephaven/csv/benchmark/stringcol/StringColumnParserDeephaven.java @@ -12,11 +12,13 @@ import java.util.Collections; public final class StringColumnParserDeephaven { - public static BenchmarkResult read(final InputStream in, final String[][] storage) throws Exception { + public static BenchmarkResult read(final InputStream in, final String[][] storage, boolean concurrent) + throws Exception { final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(null, null, null, null, storage, null); final CsvSpecs specs = CsvSpecs.builder() .parsers(Collections.singleton(Parsers.STRING)) .hasHeaderRow(true) + .concurrent(concurrent) .build(); final CsvReader.Result result = CsvReader.read(specs, in, sinkFactory); final String[][] data = Arrays.stream(result.columns()) diff --git a/src/main/java/io/deephaven/csv/reading/CsvReader.java b/src/main/java/io/deephaven/csv/reading/CsvReader.java index ee2240e9..19d60eac 100644 --- a/src/main/java/io/deephaven/csv/reading/CsvReader.java +++ b/src/main/java/io/deephaven/csv/reading/CsvReader.java @@ -105,12 +105,15 @@ public static Result read(final CsvSpecs specs, final InputStream stream, final dsrs.add(new Moveable<>(pair.second)); } - // Select an Excecutor based on whether the user wants the code to run asynchronously - // or not. - final ExecutorService exec = - specs.concurrent() - ? Executors.newFixedThreadPool(numOutputCols + 1) - : Executors.newSingleThreadExecutor(); + // Select an Excecutor based on whether the user wants the code to run asynchronously or not. + final Executor exec; + final ExecutorService executorService; + if (specs.concurrent()) { + exec = executorService = Executors.newFixedThreadPool(numOutputCols + 1); + } else { + exec = DirectExecutor.INSTANCE; + executorService = null; + } // We are generic on Object because we have a diversity of Future types (Long vs // ParseDenseStorageToColumn.Result) final ExecutorCompletionService ecs = new ExecutorCompletionService<>(exec); @@ -155,8 +158,10 @@ public static Result read(final CsvSpecs specs, final InputStream stream, final } catch (Throwable throwable) { throw new CsvReaderException("Caught exception", throwable); } finally { - // Tear down everything (interrupting the threads if necessary). - exec.shutdownNow(); + if (executorService != null) { + // Tear down everything (interrupting the threads if necessary). + executorService.shutdownNow(); + } } } @@ -369,4 +374,13 @@ public DataType dataType() { return dataType; } } + + private enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(@NotNull Runnable command) { + command.run(); + } + } }