diff --git a/.github/resources/adhoc-benchmark-docker-compose.yml b/.github/resources/adhoc-benchmark-docker-compose.yml index d33ff3a9..22803ada 100644 --- a/.github/resources/adhoc-benchmark-docker-compose.yml +++ b/.github/resources/adhoc-benchmark-docker-compose.yml @@ -30,3 +30,39 @@ services: - 8082:8082 - 9092:9092 - 29092:29092 + + minio-server: + image: minio/minio:RELEASE.2024-07-04T14-25-45Z + command: server /minio --console-address ":9001" + hostname: minio + environment: + MINIO_DOMAIN: minio + networks: + default: + aliases: + - data.minio + expose: + - "9000" + - "9001" + ports: + - 9000:9000 + - 9001:9001 + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 5s + timeout: 5s + retries: 5 + volumes: + - ./minio:/minio + + minio-bucket: + image: minio/mc + depends_on: + - minio-server + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set endpoint http://minio:9000 minioadmin minioadmin; + /usr/bin/mc mb endpoint/data; + /usr/bin/mc anonymous set public endpoint/data; + exit 0; + " diff --git a/.github/resources/nightly-benchmark-docker-compose.yml b/.github/resources/nightly-benchmark-docker-compose.yml index b6a26c4c..b791727f 100644 --- a/.github/resources/nightly-benchmark-docker-compose.yml +++ b/.github/resources/nightly-benchmark-docker-compose.yml @@ -30,3 +30,39 @@ services: - 8082:8082 - 9092:9092 - 29092:29092 + + minio-server: + image: minio/minio:RELEASE.2024-07-04T14-25-45Z + command: server /minio --console-address ":9001" + hostname: minio + environment: + MINIO_DOMAIN: minio + networks: + default: + aliases: + - data.minio + expose: + - "9000" + - "9001" + ports: + - 9000:9000 + - 9001:9001 + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 5s + timeout: 5s + retries: 5 + volumes: + - ./minio:/minio + + minio-bucket: + image: minio/mc + depends_on: + - minio-server + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set endpoint http://minio:9000 minioadmin minioadmin; + /usr/bin/mc mb endpoint/data; + /usr/bin/mc anonymous set public endpoint/data; + exit 0; + " diff --git a/.github/resources/release-benchmark-docker-compose.yml b/.github/resources/release-benchmark-docker-compose.yml index d33ff3a9..22803ada 100644 --- a/.github/resources/release-benchmark-docker-compose.yml +++ b/.github/resources/release-benchmark-docker-compose.yml @@ -30,3 +30,39 @@ services: - 8082:8082 - 9092:9092 - 29092:29092 + + minio-server: + image: minio/minio:RELEASE.2024-07-04T14-25-45Z + command: server /minio --console-address ":9001" + hostname: minio + environment: + MINIO_DOMAIN: minio + networks: + default: + aliases: + - data.minio + expose: + - "9000" + - "9001" + ports: + - 9000:9000 + - 9001:9001 + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 5s + timeout: 5s + retries: 5 + volumes: + - ./minio:/minio + + minio-bucket: + image: minio/mc + depends_on: + - minio-server + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set endpoint http://minio:9000 minioadmin minioadmin; + /usr/bin/mc mb endpoint/data; + /usr/bin/mc anonymous set public endpoint/data; + exit 0; + " diff --git a/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java index bd91e96e..52685321 100644 --- a/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/compare/CompareTestRunner.java @@ -9,7 +9,9 @@ import java.util.concurrent.atomic.AtomicReference; import io.deephaven.benchmark.api.Bench; import io.deephaven.benchmark.controller.DeephavenDockerController; +import io.deephaven.benchmark.metric.Metrics; import io.deephaven.benchmark.util.Filer; +import io.deephaven.benchmark.util.Timer; /** * A wrapper for the Bench api that allows running tests for the purpose of comparing Deephaven to other products that @@ -55,11 +57,11 @@ public void addDownloadFile(String sourceUri, String destDir) { * @param columnNames the columns in both tables to included */ public void initDeephaven(int rowCountFactor, String leftTable, String rightTable, String... columnNames) { - restartDocker(); + restartServices(); generateTable(rowCountFactor, leftTable, columnNames); if (rightTable != null) generateTable(rowCountFactor, rightTable, columnNames); - restartDocker(); + restartServices(); initialize(testInst); } @@ -303,6 +305,7 @@ Result runTest(String name, String query, String setup, String operation, String if (api == null) throw new RuntimeException("Initialize with initDeephaven() or initPython()s before running the test"); api.setName(name); + stopUnusedServices(); query = query.replace("${setupQueries}", setup); query = query.replace("${operation}", operation); query = query.replace("${mainSizeGetter}", mainSizeGetter); @@ -352,23 +355,22 @@ def run_script(runner, script_name): api.query(query).execute(); } - void restartDocker() { - var api = Bench.create("# Docker Restart"); + void restartServices() { + var api = Bench.create("# Services Restart"); try { - api.setName("# Docker Restart"); - var controller = new DeephavenDockerController(api.property("docker.compose.file", ""), + api.setName("# Services Restart"); + var c = new DeephavenDockerController(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")); - if (!controller.restartService()) - return; + c.restartService(); } finally { api.close(); } } void restartDocker(int heapGigs) { - var api = Bench.create("# Docker Restart"); + var api = Bench.create("# Services Restart"); try { - api.setName("# Docker Restart " + heapGigs + "G"); + api.setName("# Services Restart " + heapGigs + "G"); String dockerComposeFile = api.property("docker.compose.file", ""); String deephavenHostPort = api.property("deephaven.addr", ""); if (dockerComposeFile.isBlank() || deephavenHostPort.isBlank()) @@ -381,6 +383,16 @@ void restartDocker(int heapGigs) { } } + void stopUnusedServices() { + var timer = api.timer(); + var c = new DeephavenDockerController(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")); + if (!c.stopService(Set.of("deephaven"))) + return; + var metrics = new Metrics(Timer.now(), "test-runner", "setup.services"); + metrics.set("stop", timer.duration().toMillis(), "standard"); + api.metrics().add(metrics); + } + // Replace heap (e.g. -Xmx64g) in docker-compose.yml with new heap value String makeHeapAdjustedDockerCompose(String dockerComposeFile, int heapGigs) { Path sourceComposeFile = Paths.get(dockerComposeFile); diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java index 3af89035..1175a8e7 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/StandardTestRunner.java @@ -25,6 +25,7 @@ final public class StandardTestRunner { final List supportTables = new ArrayList<>(); final List setupQueries = new ArrayList<>(); final List preOpQueries = new ArrayList<>(); + final Set requiredServices = new TreeSet<>(List.of("deephaven")); private String mainTable = "source"; private Bench api; private Controller controller; @@ -89,6 +90,11 @@ public void groupedTable(String name, String... groups) { mainTable = name; generateTable(name, null, groups); } + + public void setServices(String... services) { + requiredServices.clear(); + requiredServices.addAll(Arrays.asList(services)); + } /** * Add a query to be run directly after the main table is loaded. It is not measured. This query can transform the @@ -289,6 +295,8 @@ Result runTest(String name, String query, String operation, String read, String. if (api.isClosed()) initialize(testInst); api.setName(name); + stopUnusedServices(requiredServices); + query = query.replace("${readTable}", read); query = query.replace("${mainTable}", mainTable); query = query.replace("${loadSupportTables}", loadSupportTables()); @@ -318,7 +326,7 @@ Result runTest(String name, String query, String operation, String read, String. api.result().test("deephaven-engine", result.get().elapsedTime(), result.get().loadedRowCount()); return result.get(); } finally { - addDockerLog(api); + addServiceLog(api); api.close(); } } @@ -353,36 +361,45 @@ void initialize(Object testInst) { this.api = Bench.create(testInst); this.controller = new DeephavenDockerController(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")); - restartDocker(); + restartServices(); api.query(query).execute(); } - void addDockerLog(Bench api) { + void addServiceLog(Bench api) { var timer = api.timer(); var logText = controller.getLog(); if (logText.isBlank()) return; api.log().add("deephaven-engine", logText); - var metrics = new Metrics(Timer.now(), "test-runner", "teardown.docker"); + var metrics = new Metrics(Timer.now(), "test-runner", "teardown.services"); metrics.set("log", timer.duration().toMillis(), "standard"); api.metrics().add(metrics); } - void restartDocker() { + void restartServices() { var timer = api.timer(); if (!controller.restartService()) return; - var metrics = new Metrics(Timer.now(), "test-runner", "setup.docker"); + var metrics = new Metrics(Timer.now(), "test-runner", "setup.services"); metrics.set("restart", timer.duration().toMillis(), "standard"); api.metrics().add(metrics); } + + void stopUnusedServices(Set keepServices) { + var timer = api.timer(); + if (!controller.stopService(keepServices)) + return; + var metrics = new Metrics(Timer.now(), "test-runner", "setup.services"); + metrics.set("stop", timer.duration().toMillis(), "standard"); + api.metrics().add(metrics); + } void generateTable(String name, String distribution, String[] groups) { var isNew = generateNamedTable(name, distribution, groups); if (isNew) { if (!api.isClosed()) { api.setName("# Data Table Generation " + name); - addDockerLog(api); + addServiceLog(api); api.close(); } initialize(testInst); diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java index e3ad4786..13b5b05e 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/file/FileTestRunner.java @@ -2,7 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.time.Duration; -import java.util.Arrays; +import java.util.*; import java.util.stream.Collectors; import io.deephaven.benchmark.api.Bench; import io.deephaven.benchmark.controller.Controller; @@ -16,6 +16,7 @@ class FileTestRunner { final String parquetCfg = "max_dictionary_keys=1048576, max_dictionary_size=1048576, target_page_size=65536"; final Object testInst; + final Set requiredServices = new TreeSet<>(List.of("deephaven")); private Bench api; private Controller controller; private double rowCountFactor = 1; @@ -40,6 +41,17 @@ void setScaleFactors(double rowCountFactor, int scaleFactor) { this.scaleFactor = scaleFactor; } + /** + * Sets the prefixes of the services required to run this test. Those services will be turned on while the rest will + * be turned off. + * + * @param services the services to run the test + */ + void setRequiredServices(String... servicePrefixes) { + requiredServices.clear(); + requiredServices.addAll(Arrays.asList(servicePrefixes)); + } + /** * Use the default settings in deephaven-core for parquet dictionary and page size instead of the defaults used for * benchmarks @@ -78,7 +90,24 @@ void runParquetReadTest(String testName) { } /** - * Run a benchmark the measures parquet write performance. + * Run a benchmark that measures parquet S3 read performance. This test always runs after a corresponding write + * test. + * + * @param testName name that will appear in the results as the benchmark name + */ + void runParquetS3ReadTest(String testName) { + var q = """ + read('s3://data/source.ptr.parquet', special_instructions=s3.S3Instructions( + region_name='aws-global', endpoint_override='http://minio:9000', + access_key_id='minioadmin', secret_access_key='minioadmin', + read_timeout='PT20S', connection_timeout='PT20S' + )).select() + """; + runReadTest(testName, q); + } + + /** + * Run a benchmark that measures parquet write performance. * * @param testName the benchmark name to record with the measurement * @param codec a compression codec @@ -86,6 +115,7 @@ void runParquetReadTest(String testName) { */ void runParquetWriteTest(String testName, String codec, String... columnNames) { var q = """ + remove_path('/data/source.ptr.parquet') write( source, '/data/source.ptr.parquet', compression_codec_name='${codec}'${parquetSettings} ) @@ -97,6 +127,26 @@ void runParquetWriteTest(String testName, String codec, String... columnNames) { runWriteTest(testName, q, columnNames); } + /** + * Run a benchmark that measures parquet S3 write performance. + * + * @param testName the benchmark name to record with the measurement + * @param columnNames the names of the pre-defined columns to generate + */ + void runParquetS3WriteTest(String testName, String... columnNames) { + var q = """ + remove_path('/minio/data/source.ptr.parquet') + write( + source, 's3://data/source.ptr.parquet', special_instructions=s3.S3Instructions( + region_name='aws-global', endpoint_override='http://minio:9000', + access_key_id='minioadmin', secret_access_key='minioadmin', + connection_timeout='PT20S' + ) + ) + """; + runWriteTest(testName, q, columnNames); + } + /** * Run a benchmark the measures csv write performance. * @@ -105,6 +155,7 @@ void runParquetWriteTest(String testName, String codec, String... columnNames) { */ void runCsvWriteTest(String testName, String... columnNames) { var q = """ + remove_path('/data/source.ptr.parquet') write_csv(source, '/data/source.ptr.csv') metric_file_size = os.path.getsize('/data/source.ptr.csv') bench_api_metrics_add('data', 'file.size', str(metric_file_size), 'csv') @@ -136,9 +187,12 @@ private void runReadTest(String testName, String readQuery, String... columnName private void runWriteTest(String testName, String writeQuery, String... columnNames) { var q = """ - source = merge([empty_table(${rowCount}).update([ - ${generators} - ])] * ${scaleFactor}) + if(${scaleFactor} > 1): + source = merge([empty_table(${rowCount}).update([ + ${generators} + ])] * ${scaleFactor}) + else: + source = empty_table(${rowCount}).update([${generators}]) begin_time = time.perf_counter_ns() ${writeQuery} @@ -161,6 +215,7 @@ private void runWriteTest(String testName, String writeQuery, String... columnNa private void runTest(String testName, String query) { try { api.setName(testName); + stopUnusedServices(requiredServices); api.query(query).fetchAfter("stats", table -> { long rowCount = table.getSum("processed_row_count").longValue(); long elapsedNanos = table.getSum("elapsed_nanos").longValue(); @@ -243,12 +298,18 @@ private String getType(String columnName) { private Bench initialize(Object testInst) { var query = """ - import time, os + import time, os, shutil from deephaven import empty_table, garbage_collect, new_table, merge from deephaven.column import long_col, double_col from deephaven.parquet import read, write from deephaven import read_csv, write_csv from deephaven import dtypes as dht + from deephaven.experimental import s3 + + def remove_path(path): + if(os.path.exists(path)): + if(os.path.isdir(path)): shutil.rmtree(path) + else: os.remove(path) bench_api_metrics_init() """; @@ -281,5 +342,14 @@ private void restartDocker() { metrics.set("restart", timer.duration().toMillis(), "standard"); api.metrics().add(metrics); } + + private void stopUnusedServices(Set keepServices) { + var timer = api.timer(); + if (!controller.stopService(keepServices)) + return; + var metrics = new Metrics(Timer.now(), "test-runner", "setup.services"); + metrics.set("stop", timer.duration().toMillis(), "standard"); + api.metrics().add(metrics); + } } diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/file/ParquetColTypeTest.java b/src/it/java/io/deephaven/benchmark/tests/standard/file/ParquetColTypeTest.java index 340eb781..8fd871a7 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/file/ParquetColTypeTest.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/file/ParquetColTypeTest.java @@ -11,10 +11,14 @@ class ParquetColTypeTest { final FileTestRunner runner = new FileTestRunner(this); + void setup(double rowFactor, int scaleFactor) { + runner.setScaleFactors(rowFactor, scaleFactor); + } + @Test @Order(1) void writeFourIntegralCols() { - runner.setScaleFactors(5, 12); + setup(5, 12); runner.runParquetWriteTest("ParquetWrite- 4 Integral Cols -Static", "NONE", "byte100", "short10K", "int10K", "long10K"); } @@ -22,21 +26,21 @@ void writeFourIntegralCols() { @Test @Order(2) void readFourIntegralCols() { - runner.setScaleFactors(5, 12); + setup(5, 12); runner.runParquetReadTest("ParquetRead- 4 Integral Cols -Static"); } @Test @Order(3) void writeOneStringCol() { - runner.setScaleFactors(5, 30); + setup(5, 30); runner.runParquetWriteTest("ParquetWrite- 1 String Col -Static", "NONE", "str10K"); } @Test @Order(4) void readOneStringCol() { - runner.setScaleFactors(5, 30); + setup(5, 30); runner.runParquetReadTest("ParquetRead- 1 String Col -Static"); } @@ -44,7 +48,7 @@ void readOneStringCol() { @Order(5) @Tag("Iterate") void writeOneBigDecimalCol() { - runner.setScaleFactors(5, 5); + setup(5, 5); runner.runParquetWriteTest("ParquetWrite- 1 Big Decimal Col -Static", "NONE", "bigDec10K"); } @@ -52,91 +56,91 @@ void writeOneBigDecimalCol() { @Order(6) @Tag("Iterate") void readOneBigDecimalCol() { - runner.setScaleFactors(5, 5); + setup(5, 5); runner.runParquetReadTest("ParquetRead- 1 Big Decimal Col -Static"); } @Test @Order(7) void writeOneInt1KArrayCol() { - runner.setScaleFactors(0.10, 2); + setup(0.10, 2); runner.runParquetWriteTest("ParquetWrite- 1 Array Col of 1K Ints -Static", "NONE", "intArr1K"); } @Test @Order(8) void readOneInt1KArrayCol() { - runner.setScaleFactors(0.10, 2); + setup(0.10, 2); runner.runParquetReadTest("ParquetRead- 1 Array Col of 1K Ints -Static"); } @Test @Order(9) void writeOneInt1KVectorCol() { - runner.setScaleFactors(0.10, 2); + setup(0.10, 2); runner.runParquetWriteTest("ParquetWrite- 1 Vector Col of 1K Ints -Static", "NONE", "intVec1K"); } @Test @Order(10) void readOneInt1KVectorCol() { - runner.setScaleFactors(0.10, 2); + setup(0.10, 2); runner.runParquetReadTest("ParquetRead- 1 Vector Col of 1K Ints -Static"); } @Test @Order(11) void writeOneInt5ArrayCol() { - runner.setScaleFactors(2, 20); + setup(2, 20); runner.runParquetWriteTest("ParquetWrite- 1 Array Col of 5 Ints -Static", "NONE", "intArr5"); } @Test @Order(12) void readOneInt5ArrayCol() { - runner.setScaleFactors(2, 20); + setup(2, 20); runner.runParquetReadTest("ParquetRead- 1 Array Col of 5 Ints -Static"); } @Test @Order(13) void writeOneInt5VectorCol() { - runner.setScaleFactors(2, 16); + setup(2, 16); runner.runParquetWriteTest("ParquetWrite- 1 Vector Col of 5 Ints -Static", "NONE", "intVec5"); } @Test @Order(14) void readOneInt5VectorCol() { - runner.setScaleFactors(2, 16); + setup(2, 16); runner.runParquetReadTest("ParquetRead- 1 Vector Col of 5 Ints -Static"); } @Test @Order(15) void writeOneObjectArrayCol() { - runner.setScaleFactors(2, 1); + setup(2, 1); runner.runParquetWriteTest("ParquetWrite- 1 Array Col of 3 Strings and 2 Nulls -Static", "NONE", "objArr5"); } @Test @Order(16) void readOneObjectArrayCol() { - runner.setScaleFactors(2, 1); + setup(2, 1); runner.runParquetReadTest("ParquetRead- 1 Array Col of 3 Strings and 2 Nulls -Static"); } @Test @Order(17) void writeOneObjectVectorCol() { - runner.setScaleFactors(1, 1); + setup(1, 1); runner.runParquetWriteTest("ParquetWrite- 1 Vector Col of 3 String and 2 Nulls -Static", "NONE", "objVec5"); } @Test @Order(18) void readOneObjectVectorCol() { - runner.setScaleFactors(1, 1); + setup(1, 1); runner.runParquetReadTest("ParquetRead- 1 Vector Col of 3 String and 2 Nulls -Static"); } diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/file/S3ParquetMultiColTest.java b/src/it/java/io/deephaven/benchmark/tests/standard/file/S3ParquetMultiColTest.java new file mode 100644 index 00000000..62822971 --- /dev/null +++ b/src/it/java/io/deephaven/benchmark/tests/standard/file/S3ParquetMultiColTest.java @@ -0,0 +1,34 @@ +package io.deephaven.benchmark.tests.standard.file; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; + +/** + * Standard tests for writing/reading multi-column data with different codec/compression. To save time, the parquet + * generated by the "write" tests is used by the "read" tests. + */ +@TestMethodOrder(OrderAnnotation.class) +@Tag("Iterate") +class S3ParquetMultiColTest { + final String[] usedColumns = {"str10K", "long10K", "int10K", "short10K", "bigDec10K", "intArr5", "intVec5"}; + final FileTestRunner runner = new FileTestRunner(this); + + @BeforeEach + void setup() { + runner.setRequiredServices("deephaven", "minio"); + runner.setScaleFactors(3, 3); + } + + @Test + @Order(1) + void writeMultiColNone() { + runner.runParquetS3WriteTest("S3ParquetWrite- No Codec Multi Col -Static", usedColumns); + } + + @Test + @Order(2) + void readMultiColNone() { + runner.runParquetS3ReadTest("S3ParquetRead- No Codec Multi Col -Static"); + } + +} diff --git a/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java b/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java index 1a78f7f9..650edae9 100644 --- a/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java +++ b/src/it/java/io/deephaven/benchmark/tests/standard/kafka/KafkaTestRunner.java @@ -5,6 +5,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.Set; import io.deephaven.benchmark.api.Bench; import io.deephaven.benchmark.controller.Controller; import io.deephaven.benchmark.controller.DeephavenDockerController; @@ -21,6 +22,7 @@ class KafkaTestRunner { final Object testInst; final Bench api; final Controller controller; + final Set requiredServices = Set.of("deephaven","redpanda"); private long rowCount; private int colCount; private String colType; @@ -51,7 +53,7 @@ void restartWithHeap(int deephavenHeapGigs) { return; dockerComposeFile = makeHeapAdjustedDockerCompose(dockerComposeFile, deephavenHeapGigs); var timer = api.timer(); - controller.restartService(); + controller.restartService(requiredServices); var metrics = new Metrics(Timer.now(), "test-runner", "setup.docker"); metrics.set("restart", timer.duration().toMillis(), "standard"); api.metrics().add(metrics); diff --git a/src/main/java/io/deephaven/benchmark/controller/Controller.java b/src/main/java/io/deephaven/benchmark/controller/Controller.java index 95e263ea..f1ace929 100644 --- a/src/main/java/io/deephaven/benchmark/controller/Controller.java +++ b/src/main/java/io/deephaven/benchmark/controller/Controller.java @@ -1,6 +1,9 @@ /* Copyright (c) 2022-2023 Deephaven Data Labs and Patent Pending */ package io.deephaven.benchmark.controller; +import java.util.Collection; +import java.util.Collections; + /** * Represents a mechanism that can manage the external service (e.g. Deephaven Engine) the benchmarks are running * against. This includes; start, stop, logging, etc. @@ -10,16 +13,32 @@ */ public interface Controller { /** - * Start the service according to the following contract: + * Start the services that match the given prefixes according to the following contract + *
    + *
  • If a service definition (e.g. docker-compose.yml) is not supplied, do nothing
  • + *
  • If a service definition is supplied, stop the existing service and clear state (e.g. logs)
  • + *
  • If a service definition is supplied, wait for the service to be in a usable state
  • + *
+ * + * @param services the service names to load or none to load all + * @return true if the service is running, otherwise false + */ + public boolean startService(Collection services); + + /** + * Start all services according to the following contract *
    *
  • If a service definition (e.g. docker-compose.yml) is not supplied, do nothing
  • *
  • If a service definition is supplied, stop the existing service and clear state (e.g. logs)
  • *
  • If a service definition is supplied, wait for the service to be in a usable state
  • *
* + * @param services the service names to load or none to load all * @return true if the service is running, otherwise false */ - public boolean startService(); + default public boolean startService() { + return startService(Collections.emptyList()); + } /** * Stop the service according to the follow contract: @@ -30,15 +49,40 @@ public interface Controller { * * @return true if the service definition is specified, otherwise false */ - public boolean stopService(); + public boolean stopService(Collection servicePrefixes); + + /** + * Stop all services according to the follow contract: + *
    + *
  • If a service definition (e.g. docker-compose.yml) is not supplied, do nothing
  • + *
  • If a service definition is supplied, stop all services and clear state (e.g. logs)
  • + *
+ * + * @return true if the service definition is specified, otherwise false + */ + default public boolean stopService() { + return stopService(Collections.emptyList()); + } + + /** + * Stop all services, cleanup state, and start services that match the given prefixes. Implementors can simply call + * stopService followed by startService if desired. + * + * @param services the service names to load or none to load all + * @return true if the service restarted, otherwise false + */ + public boolean restartService(Collection servicePrefixes); /** - * Stop the service, cleanup state, and start it. Implementors can simply call stopService followed by - * startService if desired. + * Stop all services, cleanup state, and start all services. Implementors can simply call stopService + * followed by startService if desired. * + * @param services the service names to load or none to load all * @return true if the service restarted, otherwise false */ - public boolean restartService(); + default public boolean restartService() { + return restartService(Collections.emptyList()); + } /** * Get the available log from the service. Results will vary depending on when the log state was last cleared. diff --git a/src/main/java/io/deephaven/benchmark/controller/DeephavenDockerController.java b/src/main/java/io/deephaven/benchmark/controller/DeephavenDockerController.java index 313ac179..9e1035d3 100644 --- a/src/main/java/io/deephaven/benchmark/controller/DeephavenDockerController.java +++ b/src/main/java/io/deephaven/benchmark/controller/DeephavenDockerController.java @@ -6,8 +6,13 @@ import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import io.deephaven.benchmark.util.Exec; +import io.deephaven.benchmark.util.Strings; import io.deephaven.benchmark.util.Threads; /** @@ -33,45 +38,59 @@ public DeephavenDockerController(String composePath, String httpHostPort) { } /** - * Start the Deephaven service. If an existing Deephaven service is running, stop it first. If a docker compose file - * is not specified, do nothing. + * Start the services that match the given prefixes. If an existing Deephaven service is running, stop all services + * first. If a docker compose file is not specified, do nothing. * - * @return true if the service was started, otherwise false + * @param servicePrefixes prefixes of service names to start + * @return true if the services were started, otherwise false */ @Override - public boolean startService() { + public boolean startService(Collection servicePrefixes) { if (composePropPath.isBlank() || httpHostPort.isBlank()) return false; var composeRunPath = getRunningComposePath(); if (composeRunPath != null) exec("sudo", "docker", "compose", "-f", composeRunPath, "down"); - exec("sudo", "docker", "compose", "-f", composePropPath, "up", "-d"); - waitForEngineReady(); + var availableServices = listAvailableServices(composePropPath); + var services = Strings.startsWith(availableServices, servicePrefixes); + exec(Strings.toArray("sudo", "docker", "compose", "-f", composePropPath, "up", "-d", services)); + if (services.contains("deephaven") || (services.isEmpty() && availableServices.contains("deephaven"))) + waitForEngineReady(); + System.out.println("Running Services after Start: " + listRunningServices(composePropPath)); return true; } /** - * Stop the Deephaven service and remove the docker container. If no docker compose is specified, do nothing. + * Stop services that do NOT match the given prefixes and remove their docker containers. If no docker compose is + * specified, do nothing. * - * @return true if the service was stopped, otherwise false + * @return true if the services were stopped, otherwise false */ @Override - public boolean stopService() { + public boolean stopService(Collection keepServicePrefixes) { if (composePropPath.isBlank()) return false; - exec("sudo", "docker", "compose", "-f", composePropPath, "down", "--timeout", "0"); + + Set services = Collections.emptySet(); + if (!keepServicePrefixes.isEmpty()) { + services = listAvailableServices(composePropPath); + services.removeAll(Strings.startsWith(services, keepServicePrefixes)); + } + exec(Strings.toArray("sudo", "docker", "compose", "-f", composePropPath, "down", "--timeout", "0", services)); + System.out.println("Running Services after stop: " + listRunningServices(composePropPath)); return true; } /** - * Stop the Deephaven service and start it. + * Stop all services including Deephaven and start the services that match the given prefixes * - * @return true if the service was started, otherwise false + * @param services the prefixes of the service names to load or none to load all + * @return true if the services were started, otherwise false */ @Override - public boolean restartService() { + public boolean restartService(Collection servicePrefixes) { stopService(); - return startService(); + return startService(servicePrefixes); } /** @@ -141,6 +160,16 @@ ContainerInfo getContainerInfo(String containerId) { return parseContainerInfo(out); } + Set listAvailableServices(String composePath) { + var out = exec("sudo", "docker", "compose", "-f", composePath, "config", "--services"); + return parseServicesList(out); + } + + Set listRunningServices(String composePath) { + var out = exec("sudo", "docker", "compose", "-f", composePath, "ps", "--services"); + return parseServicesList(out); + } + List parseContainerIds(String dockerPsStr) { return dockerPsStr.lines().filter(s -> s.contains("/deephaven/server")) .map(s -> s.replaceAll("^([^ \t]+)[ \t].*$", "$1")).toList(); @@ -152,6 +181,10 @@ ContainerInfo parseContainerInfo(String dockerInspectStr) { return new ContainerInfo(name, composeUri); } + Set parseServicesList(String servicesStr) { + return new LinkedHashSet(servicesStr.lines().map(s -> s.trim()).filter(s -> !s.isBlank()).toList()); + } + String getPropValue(String props, String name, String containsVal) { var matchName = "\"" + name + "\":"; var lines = props.lines().map(s -> s.trim()).filter(s -> s.startsWith(matchName) && s.contains(containsVal)); @@ -162,6 +195,7 @@ String getPropValue(String props, String name, String containsVal) { } String exec(String... command) { + // System.out.println("Exec: " + Arrays.asList(command)); return Exec.exec(workDir, command); } diff --git a/src/main/java/io/deephaven/benchmark/util/Strings.java b/src/main/java/io/deephaven/benchmark/util/Strings.java new file mode 100644 index 00000000..b6e94a23 --- /dev/null +++ b/src/main/java/io/deephaven/benchmark/util/Strings.java @@ -0,0 +1,49 @@ +/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.util; + +import java.util.*; + +/** + * Provide help with commonly used string manipulation + */ +public class Strings { + /** + * Covert all input objects to appropriate strings and make an array of strings + * + * @param vals a list of values (including Collections) to be converted + * @return an array of Strings + */ + static public String[] toArray(Object... vals) { + var arr = new ArrayList<>(); + for (Object val : vals) { + if (!(val instanceof Collection)) { + arr.add(val.toString()); + continue; + } + for (Object s : (Collection) val) { + arr.add(s.toString()); + } + } + return arr.toArray(new String[arr.size()]); + } + + /** + * Return a subset of available strings that start with the given prefixes. Throw an error + * if any prefixes are not found in available. + * + * @param available the list of strings to match against + * @param prefixes prefixes that available strings may start with + * @return an ordered set of available strings matched against prefixes + */ + static public Set startsWith(Collection available, Collection prefixes) { + var matches = new LinkedHashSet(); + for (String prefix : prefixes) { + var found = available.stream().filter(a -> a.startsWith(prefix)).toList(); + if (found.isEmpty()) + throw new RuntimeException("Required prefix not found: " + prefix); + matches.addAll(found); + } + return matches; + } + +} diff --git a/src/test/java/io/deephaven/benchmark/controller/DeephavenDockerControllerTest.java b/src/test/java/io/deephaven/benchmark/controller/DeephavenDockerControllerTest.java index a37e3bf8..846e006d 100644 --- a/src/test/java/io/deephaven/benchmark/controller/DeephavenDockerControllerTest.java +++ b/src/test/java/io/deephaven/benchmark/controller/DeephavenDockerControllerTest.java @@ -40,4 +40,18 @@ void parseContainerInfo() { "Wrong deephaven container compose uri"); } + @Test + void parseServicesList() { + var dockerServicesStr = """ + deephaven + minio + minio-bucket + redpanda + + """; + var c = new DeephavenDockerController(null, null); + var services = c.parseServicesList(dockerServicesStr); + assertEquals("[deephaven, minio, minio-bucket, redpanda]", services.toString(), "Bad services list"); + } + } diff --git a/src/test/java/io/deephaven/benchmark/util/StringsTest.java b/src/test/java/io/deephaven/benchmark/util/StringsTest.java new file mode 100644 index 00000000..f41c9a89 --- /dev/null +++ b/src/test/java/io/deephaven/benchmark/util/StringsTest.java @@ -0,0 +1,24 @@ +/* Copyright (c) 2022-2024 Deephaven Data Labs and Patent Pending */ +package io.deephaven.benchmark.util; + +import static org.junit.jupiter.api.Assertions.*; +import java.util.*; +import org.junit.jupiter.api.*; + +public class StringsTest { + @Test + void toArray() { + assertEquals("String[]", Strings.toArray("one", 2).getClass().getSimpleName()); + var arr = Arrays.asList(Strings.toArray("one", 2)); + assertEquals("[one, 2]", "" + arr); + assertEquals("[one, 2, one, 2]", "" + Arrays.asList(Strings.toArray("one", 2, arr))); + } + + @Test + void startsWith() { + var available = List.of("four", "three", "two", "one"); + var prefixes = List.of("tw", "th"); + assertEquals("[two, three]", Strings.startsWith(available, prefixes).toString()); + } + +}