Skip to content

Commit

Permalink
Handle per operation logging (#239)
Browse files Browse the repository at this point in the history
  • Loading branch information
stanbrub authored Dec 27, 2023
1 parent ba4f342 commit ecd8b33
Show file tree
Hide file tree
Showing 14 changed files with 453 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import io.deephaven.benchmark.api.Bench;
import io.deephaven.benchmark.util.Exec;
import io.deephaven.benchmark.controller.DeephavenDockerController;
import io.deephaven.benchmark.util.Filer;

/**
Expand All @@ -35,15 +35,6 @@ public CompareTestRunner(Object testInst) {
this.testInst = testInst;
}

/**
* Get the Bench API instance for this runner
*
* @return the Bench API instance
*/
public Bench api() {
return api;
}

/**
* Download and place the given file into the environment Deephaven is running in. If the destination directory is
* specified as a relative path, the download file will be placed relative to the root of the virtual environment
Expand Down Expand Up @@ -365,7 +356,9 @@ void restartDocker() {
var api = Bench.create("# Docker Restart");
try {
api.setName("# Docker Restart");
if (!Exec.restartDocker(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")))
var controller = new DeephavenDockerController(api.property("docker.compose.file", ""),
api.property("deephaven.addr", ""));
if (!controller.restartService())
return;
} finally {
api.close();
Expand All @@ -381,7 +374,8 @@ void restartDocker(int heapGigs) {
if (dockerComposeFile.isBlank() || deephavenHostPort.isBlank())
return;
dockerComposeFile = makeHeapAdjustedDockerCompose(dockerComposeFile, heapGigs);
Exec.restartDocker(dockerComposeFile, deephavenHostPort);
var controller = new DeephavenDockerController(dockerComposeFile, deephavenHostPort);
controller.restartService();
} finally {
api.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import io.deephaven.benchmark.api.Bench;
import io.deephaven.benchmark.util.Exec;
import io.deephaven.benchmark.controller.Controller;
import io.deephaven.benchmark.controller.DeephavenDockerController;

/**
* A wrapper for the Bench api that allows the running of small (single-operation) tests without requiring the
Expand All @@ -21,14 +22,14 @@ public class ExperimentalTestRunner {
final Object testInst;
private long scaleRowCount;
private Bench api;
private Controller controller;
private String sourceTable = "source";
private Map<String, String[]> supportTables = new LinkedHashMap<>();
private List<String> supportQueries = new ArrayList<>();

public ExperimentalTestRunner(Object testInst) {
this.testInst = testInst;
this.api = initialize(testInst);
this.scaleRowCount = api.propertyAsIntegral("scale.row.count", "100000");
initialize(testInst);
}

/**
Expand Down Expand Up @@ -219,8 +220,11 @@ Bench initialize(Object testInst) {
from deephaven.parquet import read
""";

Bench api = Bench.create(testInst);
restartDocker(api);
this.api = Bench.create(testInst);
this.controller = new DeephavenDockerController(api.property("docker.compose.file", ""),
api.property("deephaven.addr", ""));
this.scaleRowCount = api.propertyAsIntegral("scale.row.count", "100000");
controller.restartService();
api.query(query).execute();
return api;
}
Expand All @@ -229,12 +233,6 @@ String listStr(String... values) {
return String.join(", ", Arrays.stream(values).map(c -> "'" + c + "'").toList());
}

void restartDocker(Bench api) {
var dockerComposeFile = api.property("docker.compose.file", "");
var deephavenHostPort = api.property("deephaven.addr", "");
Exec.restartDocker(dockerComposeFile, deephavenHostPort);
}

void generateQuotesTable(long rowCount) {
api.table("quotes_g")
.add("Date", "string", "2023-01-04")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.time.Duration;
import io.deephaven.benchmark.api.Bench;
import io.deephaven.benchmark.controller.DeephavenDockerController;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.util.Exec;
import io.deephaven.benchmark.util.Timer;

/**
Expand Down Expand Up @@ -82,7 +82,8 @@ void runTest(String testName, String tableName, long baseRowCount, long rowCount

void restartDocker(Bench api) {
var timer = api.timer();
if (!Exec.restartDocker(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")))
var controller = new DeephavenDockerController(api.property("docker.compose.file", ""), api.property("deephaven.addr", ""));
if (!controller.restartService())
return;
var metrics = new Metrics(Timer.now(), "test-runner", "setup", "docker");
metrics.set("restart", timer.duration().toMillis(), "standard");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import io.deephaven.benchmark.api.Bench;
import io.deephaven.benchmark.controller.Controller;
import io.deephaven.benchmark.controller.DeephavenDockerController;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.util.Exec;
import io.deephaven.benchmark.util.Timer;

/**
Expand All @@ -19,21 +20,21 @@
* compact and readable, not to cover every possible case. Standard query API code can be used in conjunction as long as
* conventions are followed (ex. main file is "source")
*/
public class StandardTestRunner {
final public class StandardTestRunner {
final Object testInst;
final List<String> setupQueries = new ArrayList<>();
final List<String> supportTables = new ArrayList<>();
private String mainTable = "source";
private Bench api;
private Controller controller;
private long scaleRowCount;
private int staticFactor = 1;
private int incFactor = 1;
private int rowCountFactor = 1;

public StandardTestRunner(Object testInst) {
this.testInst = testInst;
this.api = initialize(testInst);
this.scaleRowCount = api.propertyAsIntegral("scale.row.count", "100000");
initialize(testInst);
}

/**
Expand Down Expand Up @@ -174,9 +175,11 @@ Result runStaticTest(String name, String operation, String read, String... loadC
garbage_collect()
bench_api_metrics_snapshot()
print('${logOperationBegin}')
begin_time = time.perf_counter_ns()
result = ${operation}
end_time = time.perf_counter_ns()
print('${logOperationEnd}')
bench_api_metrics_snapshot()
standard_metrics = bench_api_metrics_collect()
Expand All @@ -202,6 +205,7 @@ Result runIncTest(String name, String operation, String read, String... loadColu
garbage_collect()
bench_api_metrics_snapshot()
print('${logOperationBegin}')
begin_time = time.perf_counter_ns()
result = ${operation}
source_filter.start()
Expand All @@ -210,6 +214,7 @@ Result runIncTest(String name, String operation, String read, String... loadColu
get_exec_ctx().update_graph.j_update_graph.requestRefresh()
source_filter.waitForCompletion()
end_time = time.perf_counter_ns()
print('${logOperationEnd}')
bench_api_metrics_snapshot()
standard_metrics = bench_api_metrics_collect()
Expand All @@ -224,14 +229,18 @@ Result runIncTest(String name, String operation, String read, String... loadColu

Result runTest(String name, String query, String operation, String read, String... loadColumns) {
if (api.isClosed())
api = initialize(testInst);
initialize(testInst);
api.setName(name);
var logBeginMarker = getLogSnippet("Begin", name);
var logEndMarker = getLogSnippet("End", name);
query = query.replace("${readTable}", read);
query = query.replace("${mainTable}", mainTable);
query = query.replace("${loadSupportTables}", loadSupportTables());
query = query.replace("${loadColumns}", listStr(loadColumns));
query = query.replace("${setupQueries}", String.join("\n", setupQueries));
query = query.replace("${operation}", operation);
query = query.replace("${logOperationBegin}", logBeginMarker);
query = query.replace("${logOperationEnd}", logEndMarker);

try {
var result = new AtomicReference<Result>();
Expand All @@ -252,6 +261,7 @@ Result runTest(String name, String query, String operation, String read, String.
api.result().test("deephaven-engine", result.get().elapsedTime(), result.get().loadedRowCount());
return result.get();
} finally {
addDockerLog(api, logBeginMarker, logEndMarker);
api.close();
}
}
Expand All @@ -265,23 +275,41 @@ String loadSupportTables() {
.collect(Collectors.joining(""));
}

Bench initialize(Object testInst) {
String getLogSnippet(String beginEnd, String name) {
beginEnd = "BENCH_OPERATION_" + beginEnd.toUpperCase();
return String.join(",", "<<<<< " + beginEnd, name, beginEnd + " >>>>>");
}

void initialize(Object testInst) {
var query = """
import time
from deephaven import new_table, empty_table, garbage_collect, merge
from deephaven.column import long_col, double_col
from deephaven.parquet import read
""";

Bench api = Bench.create(testInst);
restartDocker(api);
this.api = Bench.create(testInst);
this.controller = new DeephavenDockerController(api.property("docker.compose.file", ""),
api.property("deephaven.addr", ""));
this.scaleRowCount = api.propertyAsIntegral("scale.row.count", "100000");
restartDocker();
api.query(query).execute();
return api;
}

void restartDocker(Bench api) {
void addDockerLog(Bench api, String beginMarker, String endMarker) {
var timer = api.timer();
var logText = controller.getLog();
if (logText.isBlank())
return;
api.log().add("deephaven-engine", logText);
var metrics = new Metrics(Timer.now(), "test-runner", "teardown", "docker");
metrics.set("log", timer.duration().toMillis(), "standard");
api.metrics().add(metrics);
}

void restartDocker() {
var timer = api.timer();
if (!Exec.restartDocker(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")))
if (!controller.restartService())
return;
var metrics = new Metrics(Timer.now(), "test-runner", "setup", "docker");
metrics.set("restart", timer.duration().toMillis(), "standard");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import java.util.Arrays;
import java.util.stream.Collectors;
import io.deephaven.benchmark.api.Bench;
import io.deephaven.benchmark.controller.Controller;
import io.deephaven.benchmark.controller.DeephavenDockerController;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.util.Exec;
import io.deephaven.benchmark.util.Timer;

/**
Expand All @@ -15,16 +16,16 @@
class FileTestRunner {
final String parquetCfg = "max_dictionary_keys=1048576, max_dictionary_size=1048576, target_page_size=65536";
final Object testInst;
final Bench api;
private Bench api;
private Controller controller;
private double rowCountFactor = 1;
private int scaleFactor = 1;
private long scaleRowCount;
private boolean useParquetDefaultSettings = false;

FileTestRunner(Object testInst) {
this.testInst = testInst;
this.api = initialize(testInst);
this.scaleRowCount = api.propertyAsIntegral("scale.row.count", "100000");
initialize(testInst);
}

/**
Expand Down Expand Up @@ -248,8 +249,11 @@ private Bench initialize(Object testInst) {
from deephaven import dtypes as dht
""";

Bench api = Bench.create(testInst);
restartDocker(api);
this.api = Bench.create(testInst);
this.controller = new DeephavenDockerController(api.property("docker.compose.file", ""),
api.property("deephaven.addr", ""));
this.scaleRowCount = api.propertyAsIntegral("scale.row.count", "100000");
restartDocker();
api.query(query).execute();
return api;
}
Expand All @@ -259,9 +263,9 @@ private Bench initialize(Object testInst) {
*
* @param api the Bench API for this test runner.
*/
private void restartDocker(Bench api) {
private void restartDocker() {
var timer = api.timer();
if (!Exec.restartDocker(api.property("docker.compose.file", ""), api.property("deephaven.addr", "")))
if (!controller.restartService())
return;
var metrics = new Metrics(Timer.now(), "test-runner", "setup", "docker");
metrics.set("restart", timer.duration().toMillis(), "standard");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import java.nio.file.Paths;
import java.time.Duration;
import io.deephaven.benchmark.api.Bench;
import io.deephaven.benchmark.controller.Controller;
import io.deephaven.benchmark.controller.DeephavenDockerController;
import io.deephaven.benchmark.metric.Metrics;
import io.deephaven.benchmark.util.Exec;
import io.deephaven.benchmark.util.Filer;
import io.deephaven.benchmark.util.Timer;

Expand All @@ -19,6 +20,7 @@
class KafkaTestRunner {
final Object testInst;
final Bench api;
final Controller controller;
private long rowCount;
private int colCount;
private String colType;
Expand All @@ -32,6 +34,8 @@ class KafkaTestRunner {
KafkaTestRunner(Object testInst) {
this.testInst = testInst;
this.api = Bench.create(testInst);
this.controller = new DeephavenDockerController(api.property("docker.compose.file", ""),
api.property("deephaven.addr", ""));
}

/**
Expand All @@ -41,7 +45,7 @@ class KafkaTestRunner {
* @param deephavenHeapGigs the number of gigabytes to use for Deephave max heap
*/
void restartWithHeap(int deephavenHeapGigs) {
restartDocker(api, deephavenHeapGigs);
restartDocker(deephavenHeapGigs);
}

/**
Expand Down Expand Up @@ -179,14 +183,14 @@ private String getResultTableSize(String operation) {
return "result.size";
}

private void restartDocker(Bench api, int heapGigs) {
private void restartDocker(int heapGigs) {
String dockerComposeFile = api.property("docker.compose.file", "");
String deephavenHostPort = api.property("deephaven.addr", "");
if (dockerComposeFile.isBlank() || deephavenHostPort.isBlank())
return;
dockerComposeFile = makeHeapAdjustedDockerCompose(dockerComposeFile, heapGigs);
var timer = api.timer();
Exec.restartDocker(dockerComposeFile, deephavenHostPort);
controller.restartService();
var metrics = new Metrics(Timer.now(), "test-runner", "setup", "docker");
metrics.set("restart", timer.duration().toMillis(), "standard");
api.metrics().add(metrics);
Expand Down
Loading

0 comments on commit ecd8b33

Please sign in to comment.