From 3df43a9f9a28578963b00fac4abde7d342fcdeda Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 29 Jul 2024 23:09:32 +0800 Subject: [PATCH] perf-ycsb: support ycsb model for performance testing (#169) --- .github/workflows/dispatch-perf-image.yaml | 44 ++ perf-ycsb/Dockerfile | 21 + perf-ycsb/README.md | 20 + perf-ycsb/pom.xml | 155 +++++++ .../oxia/client/perf/ycsb/OxiaOptions.java | 31 ++ .../oxia/client/perf/ycsb/Worker.java | 421 ++++++++++++++++++ .../client/perf/ycsb/WorkerException.java | 27 ++ .../oxia/client/perf/ycsb/WorkerOptions.java | 222 +++++++++ .../oxia/client/perf/ycsb/WorkerStarter.java | 26 ++ .../generator/FixedLengthValueGenerator.java | 32 ++ .../client/perf/ycsb/generator/Generator.java | 21 + .../perf/ycsb/generator/GeneratorType.java | 36 ++ .../perf/ycsb/generator/Generators.java | 46 ++ .../ycsb/generator/KeyGeneratorOptions.java | 27 ++ .../ycsb/generator/OperationGenerator.java | 44 ++ .../generator/OperationGeneratorOptions.java | 24 + .../perf/ycsb/generator/OperationType.java | 22 + .../generator/SequentialKeyGenerator.java | 36 ++ .../ycsb/generator/UniformKeyGenerator.java | 33 ++ .../ycsb/generator/ZipfianKeyGenerator.java | 33 ++ .../perf/ycsb/operations/Operations.java | 25 ++ .../client/perf/ycsb/operations/Status.java | 55 +++ .../perf/ycsb/output/BenchmarkReport.java | 114 +++++ .../ycsb/output/BenchmarkReportSnapshot.java | 81 ++++ .../oxia/client/perf/ycsb/output/Doubles.java | 28 ++ .../perf/ycsb/output/HistogramSnapshot.java | 45 ++ .../client/perf/ycsb/output/LogOutput.java | 51 +++ .../oxia/client/perf/ycsb/output/Output.java | 23 + .../perf/ycsb/output/OutputException.java | 22 + .../perf/ycsb/output/OutputOptions.java | 22 + .../client/perf/ycsb/output/OutputTypes.java | 35 ++ .../oxia/client/perf/ycsb/output/Outputs.java | 36 ++ .../client/perf/ycsb/output/PulsarOutput.java | 61 +++ .../perf/ycsb/output/PulsarOutputOptions.java | 32 ++ pom.xml | 6 + 35 files changed, 1957 insertions(+) create mode 100644 .github/workflows/dispatch-perf-image.yaml create mode 100644 perf-ycsb/Dockerfile create mode 100644 perf-ycsb/README.md create mode 100644 perf-ycsb/pom.xml create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/OxiaOptions.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/Worker.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerException.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerOptions.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerStarter.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/FixedLengthValueGenerator.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/Generator.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/GeneratorType.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/Generators.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/KeyGeneratorOptions.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationGenerator.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationGeneratorOptions.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationType.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/SequentialKeyGenerator.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/UniformKeyGenerator.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/ZipfianKeyGenerator.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/operations/Operations.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/operations/Status.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/BenchmarkReport.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/BenchmarkReportSnapshot.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Doubles.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/HistogramSnapshot.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/LogOutput.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Output.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputException.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputOptions.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputTypes.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Outputs.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/PulsarOutput.java create mode 100644 perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/PulsarOutputOptions.java diff --git a/.github/workflows/dispatch-perf-image.yaml b/.github/workflows/dispatch-perf-image.yaml new file mode 100644 index 00000000..41ac8aee --- /dev/null +++ b/.github/workflows/dispatch-perf-image.yaml @@ -0,0 +1,44 @@ +name: DockerHub Publish + +on: + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + publish-docker: + name: Publish docker image + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: docker/setup-qemu-action@v2 + - uses: docker/setup-buildx-action@v2 + + - name: Docker meta + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ github.repository }} + tags: | + type=ref,event=branch + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + + - name: Login to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKER_USER }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Build and push + uses: docker/build-push-action@v3 + with: + context: ./perf-ycsb + platforms: linux/x86_64,linux/arm64 + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=registry,ref=${{ steps.meta.outputs.tags }} + cache-to: type=inline diff --git a/perf-ycsb/Dockerfile b/perf-ycsb/Dockerfile new file mode 100644 index 00000000..6868fe7e --- /dev/null +++ b/perf-ycsb/Dockerfile @@ -0,0 +1,21 @@ +# +# Copyright © 2022-2024 StreamNative Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM eclipse-temurin:21.0.3_9-jdk + +WORKDIR perf + +COPY ./target/oxia-perf-ycsb-0.3.1-SNAPSHOT.jar /perf/oxia-perf-ycsb.jar diff --git a/perf-ycsb/README.md b/perf-ycsb/README.md new file mode 100644 index 00000000..d4086226 --- /dev/null +++ b/perf-ycsb/README.md @@ -0,0 +1,20 @@ +# Oxia YCSB + +This is the tool for testing StreamNative oxia by the YCSB model. + +## Getting Started + +## Install + +1. Compile and package the `perf-ycsb` + +```shell +mvn clean package -DskipTests -P perf-ycsb -pl perf-ycsb +``` + +2. Run the command + +```shell +java -jar ./perf-ycsb/target/oxia-perf-ycsb-.jar ycsb +``` + diff --git a/perf-ycsb/pom.xml b/perf-ycsb/pom.xml new file mode 100644 index 00000000..5d1760e7 --- /dev/null +++ b/perf-ycsb/pom.xml @@ -0,0 +1,155 @@ + + + + 4.0.0 + + + io.streamnative.oxia + oxia-java + 0.3.1-SNAPSHOT + + + oxia-perf-ycsb + Oxia YCSB + + + 2.1.9 + 3.4.1 + + + + + ${project.groupId} + oxia-client-api + ${project.version} + + + com.fasterxml.jackson.core + jackson-databind + 2.17.2 + + + + info.picocli + picocli + 4.7.6 + + + io.opentelemetry + opentelemetry-exporter-logging + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-exporter-prometheus + ${opentelemetry.version}-alpha + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + ${opentelemetry.version} + + + io.streamnative.oxia + oxia-client + ${project.version} + + + org.apache.commons + commons-math3 + 3.6.1 + + + org.apache.pulsar + pulsar-client + 3.3.0 + + + org.hdrhistogram + HdrHistogram + ${hdr-histogram.version} + + + org.slf4j + slf4j-simple + ${slf4j.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 21 + 21 + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven.shade.plugin.version} + + + + shade + + package + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + *:* + + module-info.class + META-INF/MANIFEST.MF + + + + + + + + io.streamnative.oxia.client.perf.ycsb.WorkerStarter + + + + + + + + + diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/OxiaOptions.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/OxiaOptions.java new file mode 100644 index 00000000..2d38045e --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/OxiaOptions.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb; + +import picocli.CommandLine; + +@CommandLine.Command( + name = "oxia", + subcommands = {WorkerOptions.class}) +public final class OxiaOptions implements Runnable { + + @CommandLine.Spec CommandLine.Model.CommandSpec spec; + + @Override + public void run() { + throw new CommandLine.ParameterException(spec.commandLine(), "Specify a subcommand"); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/Worker.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/Worker.java new file mode 100644 index 00000000..678a5c54 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/Worker.java @@ -0,0 +1,421 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.RateLimiter; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.streamnative.oxia.client.api.*; +import io.streamnative.oxia.client.api.exceptions.OxiaException; +import io.streamnative.oxia.client.metrics.Unit; +import io.streamnative.oxia.client.perf.ycsb.generator.Generator; +import io.streamnative.oxia.client.perf.ycsb.generator.GeneratorType; +import io.streamnative.oxia.client.perf.ycsb.generator.Generators; +import io.streamnative.oxia.client.perf.ycsb.generator.KeyGeneratorOptions; +import io.streamnative.oxia.client.perf.ycsb.generator.OperationGeneratorOptions; +import io.streamnative.oxia.client.perf.ycsb.generator.OperationType; +import io.streamnative.oxia.client.perf.ycsb.operations.Operations; +import io.streamnative.oxia.client.perf.ycsb.operations.Status; +import io.streamnative.oxia.client.perf.ycsb.output.*; +import java.io.Closeable; +import java.time.Duration; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public final class Worker implements Runnable, Closeable, Operations { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private final WorkerOptions options; + private final SyncOxiaClient client; + private final Generator keyGenerator; + private final Generator valueGenerator; + private final Generator operationGenerator; + private final Output intervalOutput; + private final Output globalOutput; + + private volatile CompletableFuture closeFuture; + + private final Semaphore outstandingSemaphore; + + /* Otl section */ + private final LongCounter operationCounter; + private final ObservableLongGauge outstandingRequestGauge; + private final Attributes operationWriteSuccessAttributes; + private final Attributes operationWriteFailedAttributes; + private final Attributes operationReadSuccessAttributes; + private final Attributes operationReadFailedAttributes; + + private final DoubleHistogram operationLatency; + private static final List LATENCY_BUCKET = + Lists.newArrayList( + .0005, .001, .0025, .005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, + 90.0, 120.0, 240.0); + private static final double MICROS = TimeUnit.SECONDS.toMicros(1); + + public Worker(WorkerOptions options, OpenTelemetry openTelemetry) { + try { + this.client = + OxiaClientBuilder.create(options.serviceAddr) + .batchLinger(Duration.ofMillis(options.batchLingerMs)) + .maxRequestsPerBatch(options.maxRequestsPerBatch) + .requestTimeout(Duration.ofMillis(options.requestTimeoutMs)) + .namespace(options.namespace) + .openTelemetry(openTelemetry) + .syncClient(); + } catch (OxiaException e) { + throw new WorkerException(e); + } + final GeneratorType generatorType = GeneratorType.fromString(options.keyDistribution); + + this.outstandingSemaphore = new Semaphore(options.maxOutstandingRequests); + this.keyGenerator = + Generators.createKeyGenerator( + new KeyGeneratorOptions( + generatorType, + options.keyPrefix, + options.lowerBound, + options.upperBound, + options.elements, + options.exponent)); + this.valueGenerator = Generators.createFixedLengthValueGenerator(options.valueSize); + this.operationGenerator = + Generators.createOperationGenerator( + new OperationGeneratorOptions( + options.writePercentage, options.readPercentage, options.scanPercentage)); + this.intervalOutput = Outputs.createLogOutput(false); + final OutputTypes outputTypes = OutputTypes.fromString(options.globalOutputType); + this.globalOutput = + Outputs.createOutput( + outputTypes, + new OutputOptions( + options.globalOutputLogPretty, + new PulsarOutputOptions( + options.globalOutputPulsarServiceURL, + options.globalOutputPulsarTargetTopic, + options.globalOutputPulsarAuthenticationPlugin, + options.globalOutputPulsarAuthenticationParams))); + this.options = options; + final var meter = openTelemetry.getMeter("io.streamnative.oxia.perf-ycsb"); + this.operationCounter = + meter + .counterBuilder("oxia.perf.ycsb.op") + .setDescription("oxia perf operation counter") + .setUnit(Unit.Requests.toString()) + .build(); + this.outstandingRequestGauge = + meter + .gaugeBuilder("oxia.perf.ycsb.op.outstanding") + .setDescription("oxia outstanding request") + .setUnit(Unit.Requests.toString()) + .ofLongs() + .buildWithCallback( + (ob) -> { + ob.record( + options.maxOutstandingRequests - outstandingSemaphore.availablePermits(), + Attributes.builder() + .put("operation.num", options.operationNum) + .put("value.size", options.valueSize) + .put("worker", options.workerName) + .put("env", options.envName) + .put("env.shards", options.envShards) + .build()); + }); + this.operationLatency = + meter + .histogramBuilder("oxia.perf.ycsb.op.second") + .setUnit(Unit.Seconds.toString()) + .setDescription("oxia perf operation latency") + .setExplicitBucketBoundariesAdvice(LATENCY_BUCKET) + .build(); + this.operationWriteSuccessAttributes = + Attributes.builder() + .put("type", "write") + .put("operation.num", options.operationNum) + .put("value.size", options.valueSize) + .put("worker", options.workerName) + .put("env", options.envName) + .put("env.shards", options.envShards) + .put("response", "success") + .build(); + this.operationWriteFailedAttributes = + Attributes.builder() + .put("type", "write") + .put("operation.num", options.operationNum) + .put("value.size", options.valueSize) + .put("worker", options.workerName) + .put("env", options.envName) + .put("env.shards", options.envShards) + .put("response", "failed") + .build(); + this.operationReadSuccessAttributes = + Attributes.builder() + .put("type", "read") + .put("operation.num", options.operationNum) + .put("value.size", options.valueSize) + .put("worker", options.workerName) + .put("env", options.envName) + .put("env.shards", options.envShards) + .put("response", "success") + .build(); + this.operationReadFailedAttributes = + Attributes.builder() + .put("type", "read") + .put("operation.num", options.operationNum) + .put("value.size", options.valueSize) + .put("worker", options.workerName) + .put("env", options.envName) + .put("env.shards", options.envShards) + .put("response", "failed") + .build(); + } + + @SuppressWarnings("UnstableApiUsage") + @Override + public void run() { + try { + final String optionsStr = MAPPER.writeValueAsString(options); + log.info("starting worker. the options={}", optionsStr); + } catch (JsonProcessingException ex) { + throw new WorkerException(ex); + } + + final RateLimiter operationRatelimiter = RateLimiter.create(options.requestsRate); + final int maxOutstandingRequests = options.maxOutstandingRequests; + + final BenchmarkReport globalReport = BenchmarkReport.createDefault(); + final BenchmarkReport intervalReport = BenchmarkReport.createDefault(); + + final Function globalSnapshotFunc = + globalReport.snapshotFunc(options, false); + final Function internalSnapshotFunc = + intervalReport.snapshotFunc(options, true); + + log.info("performance test is starting"); + final AtomicLong operationNum = new AtomicLong(options.operationNum); + final Thread intervalOutputTask = + Thread.ofVirtual() + .start( + () -> { + log.info("starting interval output task."); + long lastSnapshotTime = System.nanoTime(); + //noinspection InfiniteLoopStatement + while (true) { + try { + //noinspection BusyWait + Thread.sleep(options.intervalOutputSec * 1000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("exit interval output thread while sleeping by interrupt"); + return; + } + + intervalOutput.report(internalSnapshotFunc.apply(lastSnapshotTime)); + if (options.operationNum > 0) { + log.info("remain operation num {}", operationNum.get()); + } + lastSnapshotTime = System.nanoTime(); + } + }); + final long taskStartTime = System.nanoTime(); + while ((options.operationNum > 0 ? operationNum.getAndDecrement() > 0 : closeFuture == null)) { + // jump out by closing worker + operationRatelimiter.acquire(); + try { + outstandingSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new WorkerException(e); + } + + final OperationType operationType = operationGenerator.nextValue(); + final String key = keyGenerator.nextValue(); + final byte[] value = valueGenerator.nextValue(); + Thread.ofVirtual() + .start( + () -> { + try { + switch (operationType) { + case WRITE -> { + globalReport.writeTotal().increment(); + intervalReport.writeTotal().increment(); + final long start = System.nanoTime(); + final Status sts; + if (options.writeWithSequence) { + sts = writeWithSequence(key, value); + } else { + sts = write(key, value); + } + if (!sts.isSuccess()) { + operationCounter.add(1, operationWriteFailedAttributes); + log.warn("write failed. the error info {}", sts.getErrorInfo()); + final long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - start); + operationLatency.record( + latencyMicros / MICROS, operationWriteFailedAttributes); + globalReport.writeFailed().increment(); + intervalReport.writeFailed().increment(); + } else { + operationCounter.add(1, operationWriteSuccessAttributes); + final long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - start); + operationLatency.record( + latencyMicros / MICROS, operationWriteSuccessAttributes); + globalReport.writeLatency().recordValue(latencyMicros); + intervalReport.writeLatency().recordValue(latencyMicros); + } + } + case READ -> { + globalReport.readTotal().increment(); + intervalReport.readTotal().increment(); + final long start = System.nanoTime(); + final Status sts = read(key); + if (!sts.isSuccess()) { + operationCounter.add(1, operationReadFailedAttributes); + final long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - start); + operationLatency.record( + latencyMicros / MICROS, operationReadFailedAttributes); + log.warn("read failed. the error info {}", sts.getErrorInfo()); + globalReport.readFailed().increment(); + intervalReport.readFailed().increment(); + } else { + operationCounter.add(1, operationReadSuccessAttributes); + final long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - start); + operationLatency.record( + latencyMicros / MICROS, operationReadSuccessAttributes); + globalReport.readLatency().recordValue(latencyMicros); + intervalReport.readLatency().recordValue(latencyMicros); + } + } + default -> throw new UnsupportedOperationException("unsupported yet"); + } + } finally { + outstandingSemaphore.release(); + } + }); + } + + try { + outstandingSemaphore.acquire(maxOutstandingRequests); // acquire all of permits + } catch (InterruptedException e) { + throw new WorkerException(e); + } + final BenchmarkReportSnapshot globalSnapshot = globalSnapshotFunc.apply(taskStartTime); + + // interrupt the interval output task + intervalOutputTask.interrupt(); + + globalOutput.report(globalSnapshot); + + if (closeFuture == null) { + synchronized (this) { + if (closeFuture == null) { + // avoid close after running + closeFuture = CompletableFuture.completedFuture(null); + } + } + } else { + if (!closeFuture.complete(null)) { + log.warn("bug! unexpected behaviour: completed future and empty close future"); + } + } + log.info("performance test is done"); + } + + @Override + public void close() { + // mark the worker is closing + if (closeFuture == null) { + synchronized (this) { + if (closeFuture == null) { + closeFuture = new CompletableFuture<>(); + } + } + } + // wait for task run complete + closeFuture.join(); + try { + client.close(); + if (globalOutput != null) { + globalOutput.close(); + } + if (intervalOutput != null) { + intervalOutput.close(); + } + } catch (Exception ex) { + throw new WorkerException(ex); + } + + outstandingRequestGauge.close(); // close observer + } + + @Override + public Status write(String key, byte[] value) { + try { + final PutResult result = client.put(key, value); + if (result != null) { + return Status.success(); + } + return Status.failed("empty result"); + } catch (Throwable ex) { + return Status.failed(ex.getMessage()); + } + } + + @Override + public Status writeWithSequence(String key, byte[] value) { + try { + final PutResult result = + client.put( + key, + value, + Set.of( + PutOption.PartitionKey(key), + PutOption.SequenceKeysDeltas(List.of(1L, (long) value.length)))); + if (result != null) { + return Status.success(); + } + return Status.failed("empty result"); + } catch (Throwable ex) { + return Status.failed(ex.getMessage()); + } + } + + @Override + public Status read(String key) { + try { + final GetResult result = client.get(key); + if (result != null) { + return Status.success(result.getValue()); + } + return Status.failed("empty result"); + } catch (Throwable ex) { + return Status.failed(ex.getMessage()); + } + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerException.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerException.java new file mode 100644 index 00000000..ee3af479 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerException.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb; + +public final class WorkerException extends RuntimeException { + + public WorkerException(String message) { + super(message); + } + + public WorkerException(Throwable cause) { + super(cause); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerOptions.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerOptions.java new file mode 100644 index 00000000..c527c000 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerOptions.java @@ -0,0 +1,222 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb; + +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; +import io.streamnative.oxia.client.OxiaClientBuilderImpl; +import lombok.extern.slf4j.Slf4j; +import picocli.CommandLine; + +import java.util.concurrent.TimeUnit; + +@Slf4j +@CommandLine.Command(name = "ycsb") +public final class WorkerOptions implements Runnable { + + @CommandLine.Option( + names = {"--name"}, + required = true, + description = "worker name for this test" + ) + String workerName = ""; + + @CommandLine.Option( + names = {"--env-name"}, + description = "the worker environment name" + ) + String envName = ""; + + @CommandLine.Option( + names = {"--env-shards"}, + description = "the worker environment shards number" + ) + int envShards = 0; + + @CommandLine.Option( + names = {"--min-exit-time-sec"}, + description = "minimal exit time in the second") + int minExitTimeSec = 60 * 5; // at least exist 3 min to let metrics collector + + @CommandLine.Option( + names = {"--service-addr"}, + description = "Oxia Service Address") + String serviceAddr = "localhost:6648"; + + @CommandLine.Option( + names = {"--namespace"}, + description = "Oxia namespace") + String namespace = "default"; + + @CommandLine.Option( + names = {"--rate"}, + description = "Request rate, ops/s") + double requestsRate = 1000.0; + + @CommandLine.Option( + names = {"--batch-linger-ms"}, + description = "Batch linger time") + long batchLingerMs = OxiaClientBuilderImpl.DefaultBatchLinger.toMillis(); + + @CommandLine.Option( + names = {"--max-requests-per-batch"}, + description = "Maximum requests per batch") + int maxRequestsPerBatch = OxiaClientBuilderImpl.DefaultMaxRequestsPerBatch; + + @CommandLine.Option( + names = {"--request-timeout-ms"}, + description = "Requests timeout") + long requestTimeoutMs = OxiaClientBuilderImpl.DefaultRequestTimeout.toMillis(); + + @CommandLine.Option( + names = {"--max-outstanding-requests"}, + description = "Max number of outstanding requests to server") + int maxOutstandingRequests = 100_000; + + /* Operations */ + @CommandLine.Option( + names = {"--write-percent"}, + description = "Percentage of write requests, compared to total requests") + double writePercentage = 20.0; + + @CommandLine.Option( + names = {"--scan-percent"}, + description = "Percentage of scan requests, compared to total requests") + double scanPercentage = 0.0; + + @CommandLine.Option( + names = {"--read-percent"}, + description = "Percentage of read requests, compared to total requests") + double readPercentage = 80.0; + + @CommandLine.Option( + names = {"--operation-num"}, + description = "Num of total operations. 0 means no limit") + long operationNum = 1000; + + @CommandLine.Option( + names = {"--write-with-sequence"}, + description = "Whether enable write with sequence." + ) + boolean writeWithSequence = false; + + /* Generation */ + + @CommandLine.Option( + names = {"--key-distribution"}, + description = "The key distribution. support uniform,zipfian,sequential") + String keyDistribution = "sequential"; + + @CommandLine.Option( + names = {"--key-prefix"}, + description = "The key prefix.") + String keyPrefix = "key-"; + + /* Uniform distribution */ + @CommandLine.Option( + names = {"--key-uniform-lower-bound"}, + description = + "The lower bound of uniform key distribution.(inclusive)") + int lowerBound = 0; + + @CommandLine.Option( + names = {"--key-uniform-upper-bound"}, + description = + "The upper bound of uniform key distribution.(inclusive)") + int upperBound = 100_000; + + + @CommandLine.Option( + names = {"--key-zipfian-elements"}, + description = "The number of elements for zipfain distribution" + ) + int elements = 100_000; + + @CommandLine.Option( + names = {"--key-zipfian-exponent"}, + description = """ + The exponent for zipfain distribution. + When s=1, the Zipf distribution degenerates into a classic power-law distribution. + When s>1, the distribution will be more sparse, meaning fewer elements will occupy a larger proportion. + When s<1, the distribution will be more dense, meaning more elements will occupy a larger proportion. + """ + ) + double exponent = 1.0; + + /* Value Generation */ + @CommandLine.Option( + names = {"--value-size"}, + description = "Size of the values to write") + int valueSize = 64; + + /* Output */ + + @CommandLine.Option( + names = {"--output-interval"}, + description = "The interval(second) of output") + int intervalOutputSec = 10; + + @CommandLine.Option( + names = {"--output-global-type"}, + description = "The type of global output. supported: log,pulsar" + ) + String globalOutputType = "log"; + + /* output log */ + @CommandLine.Option( + names = {"--output-global-log-pretty"}, + description = "Whether pretty the data for the log output type." + ) + boolean globalOutputLogPretty = true; + + /* output pulsar */ + @CommandLine.Option( + names = {"--output-global-pulsar-service-url"}, + description = "The pulsar service URL." + ) + String globalOutputPulsarServiceURL; + @CommandLine.Option( + names = {"--output-global-pulsar-target-topic"}, + description = "The target pulsar service topic." + ) + String globalOutputPulsarTargetTopic; + @CommandLine.Option( + names = {"--output-global-pulsar-authentication-plugin"}, + description = "The authentication plugin name." + ) + String globalOutputPulsarAuthenticationPlugin; + @CommandLine.Option( + names = {"--output-global-pulsar-authentication-params"}, + description = "The authentication plugin params." + ) + String globalOutputPulsarAuthenticationParams; + + @Override + public void run() { + final var sdk = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk(); + try (Worker worker = new Worker(this, sdk)) { + final long startRunningTime = System.currentTimeMillis(); + worker.run(); + long elapsedMs = System.currentTimeMillis() - startRunningTime; + if (TimeUnit.MILLISECONDS.toSeconds(elapsedMs) < minExitTimeSec) { + Thread.sleep(TimeUnit.SECONDS.toMillis(elapsedMs)); + } + } catch (Throwable ex) { + log.error("unexpected error. ", ex); + } + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerStarter.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerStarter.java new file mode 100644 index 00000000..58cb0a26 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/WorkerStarter.java @@ -0,0 +1,26 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb; + +import picocli.CommandLine; + +public final class WorkerStarter { + + public static void main(String[] args) { + final int exitCode = new CommandLine(new OxiaOptions()).execute(args); + System.exit(exitCode); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/FixedLengthValueGenerator.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/FixedLengthValueGenerator.java new file mode 100644 index 00000000..9d8461c1 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/FixedLengthValueGenerator.java @@ -0,0 +1,32 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +import java.util.concurrent.ThreadLocalRandom; + +final class FixedLengthValueGenerator implements Generator { + private final byte[] payload; + + public FixedLengthValueGenerator(int size) { + payload = new byte[size]; + ThreadLocalRandom.current().nextBytes(payload); + } + + @Override + public byte[] nextValue() { + return payload; + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/Generator.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/Generator.java new file mode 100644 index 00000000..d750dbd8 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/Generator.java @@ -0,0 +1,21 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +public interface Generator { + + V nextValue(); +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/GeneratorType.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/GeneratorType.java new file mode 100644 index 00000000..07ede687 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/GeneratorType.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +import static java.util.Objects.requireNonNull; + +import java.util.Locale; + +public enum GeneratorType { + UNIFORM, + ZIPFIAN, + SEQUENTIAL; + + public static GeneratorType fromString(String type) { + requireNonNull(type); + for (GeneratorType gType : GeneratorType.values()) { + if (gType.name().toLowerCase(Locale.ROOT).equals(type.toLowerCase(Locale.ROOT))) { + return gType; + } + } + throw new IllegalArgumentException("unknown generator type:" + type); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/Generators.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/Generators.java new file mode 100644 index 00000000..f68b92f2 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/Generators.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +import static java.util.Objects.requireNonNull; + +public final class Generators { + + public static Generator createKeyGenerator(KeyGeneratorOptions options) { + return switch (options.type()) { + case SEQUENTIAL -> new SequentialKeyGenerator(options); + case UNIFORM -> new UniformKeyGenerator(options); + case ZIPFIAN -> new ZipfianKeyGenerator(options); + }; + } + + public static Generator createOperationGenerator( + OperationGeneratorOptions options) { + requireNonNull(options); + if (!options.validate()) { + throw new IllegalArgumentException( + "not validate operation. The probabilities do not sum to 100% "); + } + return new OperationGenerator(options); + } + + public static Generator createFixedLengthValueGenerator(int size) { + if (size <= 0) { + throw new IllegalArgumentException("size can not lower than or equals to 0"); + } + return new FixedLengthValueGenerator(size); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/KeyGeneratorOptions.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/KeyGeneratorOptions.java new file mode 100644 index 00000000..d9a9ea6d --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/KeyGeneratorOptions.java @@ -0,0 +1,27 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +public record KeyGeneratorOptions( + GeneratorType type, + /* common parts */ + String prefix, + /* Uniform */ + int lowerBound, + int upperBound, + /* Zipfian */ + int elements, + double exponent) {} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationGenerator.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationGenerator.java new file mode 100644 index 00000000..8923c15d --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationGenerator.java @@ -0,0 +1,44 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +import java.util.concurrent.ThreadLocalRandom; + +public class OperationGenerator implements Generator { + private final double writeThreshold; + private final double readThreshold; + private final double scanThreshold; + + public OperationGenerator(OperationGeneratorOptions options) { + this.writeThreshold = options.writePercentage(); + this.readThreshold = options.readPercentage() + writeThreshold; + this.scanThreshold = + options.scanPercentage() > 0 ? options.scanPercentage() + readThreshold : -1; + } + + @Override + public OperationType nextValue() { + final int rand = ThreadLocalRandom.current().nextInt(100); + if (rand < writeThreshold) { + return OperationType.WRITE; + } else if (rand < readThreshold) { + return OperationType.READ; + } else if (rand < scanThreshold) { + return OperationType.SCAN; + } + throw new IllegalStateException("out of bound: " + rand); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationGeneratorOptions.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationGeneratorOptions.java new file mode 100644 index 00000000..7d8911ff --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationGeneratorOptions.java @@ -0,0 +1,24 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +public record OperationGeneratorOptions( + double writePercentage, double readPercentage, double scanPercentage) { + + public boolean validate() { + return writePercentage + scanPercentage + readPercentage == 100.0; + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationType.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationType.java new file mode 100644 index 00000000..aa5d5c2f --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/OperationType.java @@ -0,0 +1,22 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +public enum OperationType { + WRITE, + READ, + SCAN +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/SequentialKeyGenerator.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/SequentialKeyGenerator.java new file mode 100644 index 00000000..9a969437 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/SequentialKeyGenerator.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +import java.util.concurrent.atomic.AtomicLong; + +final class SequentialKeyGenerator implements Generator { + private final AtomicLong counter = new AtomicLong(0); + private final String prefix; + + public SequentialKeyGenerator(KeyGeneratorOptions options) { + this.prefix = options.prefix(); + } + + @Override + public String nextValue() { + final long nextNum = counter.getAndIncrement(); + if (prefix == null) { + return nextNum + ""; + } + return prefix + nextNum; + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/UniformKeyGenerator.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/UniformKeyGenerator.java new file mode 100644 index 00000000..8daf7a2f --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/UniformKeyGenerator.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +import org.apache.commons.math3.distribution.UniformIntegerDistribution; + +final class UniformKeyGenerator implements Generator { + private final String prefix; + private final UniformIntegerDistribution distribution; + + public UniformKeyGenerator(KeyGeneratorOptions options) { + this.prefix = options.prefix(); + this.distribution = new UniformIntegerDistribution(options.lowerBound(), options.upperBound()); + } + + @Override + public String nextValue() { + return prefix + distribution.sample(); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/ZipfianKeyGenerator.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/ZipfianKeyGenerator.java new file mode 100644 index 00000000..4f73aa35 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/generator/ZipfianKeyGenerator.java @@ -0,0 +1,33 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.generator; + +import org.apache.commons.math3.distribution.ZipfDistribution; + +final class ZipfianKeyGenerator implements Generator { + private final ZipfDistribution distribution; + private final String prefix; + + public ZipfianKeyGenerator(KeyGeneratorOptions options) { + this.prefix = options.prefix(); + this.distribution = new ZipfDistribution(options.elements(), options.exponent()); + } + + @Override + public String nextValue() { + return prefix + distribution.sample(); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/operations/Operations.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/operations/Operations.java new file mode 100644 index 00000000..84c1c959 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/operations/Operations.java @@ -0,0 +1,25 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.operations; + +public interface Operations { + + Status read(String key); + + Status write(String key, byte[] value); + + Status writeWithSequence(String key, byte[] value); +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/operations/Status.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/operations/Status.java new file mode 100644 index 00000000..a3987b5a --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/operations/Status.java @@ -0,0 +1,55 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.operations; + +public final class Status { + private int code; + private byte[] payload; + private String errorInfo; + + private Status() {} + + public String getErrorInfo() { + return errorInfo; + } + + public static Status success() { + final Status status = new Status(); + status.code = CODE_SUCCESS; + return status; + } + + public static Status success(byte[] payload) { + final Status status = new Status(); + status.code = CODE_SUCCESS; + status.payload = payload; + return status; + } + + public boolean isSuccess() { + return code == CODE_SUCCESS; + } + + public static Status failed(String errorInfo) { + final Status status = new Status(); + status.code = CODE_UNKNOWN; + status.errorInfo = errorInfo; + return status; + } + + public static final int CODE_SUCCESS = 0; + public static final int CODE_UNKNOWN = 1; +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/BenchmarkReport.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/BenchmarkReport.java new file mode 100644 index 00000000..f73e170b --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/BenchmarkReport.java @@ -0,0 +1,114 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import io.streamnative.oxia.client.perf.ycsb.WorkerOptions; + +import java.math.BigDecimal; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.Recorder; + +@Deprecated +public record BenchmarkReport( + LongAdder writeTotal, + LongAdder writeFailed, + Recorder writeLatency, + LongAdder readTotal, + LongAdder readFailed, + Recorder readLatency) { + public static BenchmarkReport createDefault() { + return new BenchmarkReport( + new LongAdder(), + new LongAdder(), + new Recorder(TimeUnit.SECONDS.toMicros(120_000), 5), + new LongAdder(), + new LongAdder(), + new Recorder(TimeUnit.SECONDS.toMicros(120_000), 5)); + } + + + public Function snapshotFunc(WorkerOptions options, boolean interval) { + // recycler objects + final AtomicReference writeHistogramRef = new AtomicReference<>(); + final AtomicReference readHistogramRef = new AtomicReference<>(); + + // lambda + return (taskStartTime) -> { + + double elapsed = (System.nanoTime() - taskStartTime) / 1e9; + // write section + final long totalWrite = writeTotal().sumThenReset(); + final double writeOps = Doubles.format2Scale(totalWrite / elapsed); + final long totalWriteFailed = writeFailed().sumThenReset(); + final double writeFailedOps = Doubles.format2Scale(totalWriteFailed / elapsed); + + // read section + final long totalRead = readTotal().sumThenReset(); + final double readOps = Doubles.format2Scale(totalRead / elapsed); + final long totalReadFailed = readFailed().sumThenReset(); + final double readFailedOps = Doubles.format2Scale(totalReadFailed / elapsed); + + writeHistogramRef.setRelease(writeLatency().getIntervalHistogram(writeHistogramRef.get())); + final HistogramSnapshot writeLatencySnapshot = + HistogramSnapshot.fromHistogram(writeHistogramRef.get()); + + readHistogramRef.setRelease(readLatency().getIntervalHistogram(readHistogramRef.get())); + final HistogramSnapshot readLatencySnapshot = + HistogramSnapshot.fromHistogram(readHistogramRef.get()); + + writeHistogramRef.get().reset(); + readHistogramRef.get().reset(); + + /* + Interval snapshot don't need total values. + */ + if (interval) { + return new BenchmarkReportSnapshot( + null, + System.currentTimeMillis(), + 0, + writeOps, + 0, + writeFailedOps, + writeLatencySnapshot, + 0, + readOps, + 0, + readFailedOps, + readLatencySnapshot); + } else { + return new BenchmarkReportSnapshot( + options, + System.currentTimeMillis(), + totalWrite, + writeOps, + totalWriteFailed, + writeFailedOps, + writeLatencySnapshot, + totalRead, + readOps, + totalReadFailed, + readFailedOps, + readLatencySnapshot); + } + }; + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/BenchmarkReportSnapshot.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/BenchmarkReportSnapshot.java new file mode 100644 index 00000000..f4b358e7 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/BenchmarkReportSnapshot.java @@ -0,0 +1,81 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import com.fasterxml.jackson.annotation.JsonInclude; +import io.streamnative.oxia.client.perf.ycsb.WorkerOptions; +import lombok.Data; + +@Data +public final class BenchmarkReportSnapshot { + @JsonInclude(JsonInclude.Include.NON_NULL) + private final WorkerOptions definition; + + private final long timestamp; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final long totalWrite; + + private final double writeOps; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final long totalFailedWrite; + + private final double writeFps; + private final HistogramSnapshot writeLatencyMs; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final long totalRead; + + private final double readOps; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) + private final long totalFailedRead; + + private final double readFps; + private final HistogramSnapshot readLatencyMs; + + public BenchmarkReportSnapshot( + /* definitions section */ + @JsonInclude(JsonInclude.Include.NON_NULL) WorkerOptions definition, + /* metadata section */ + long timestamp, + /* ops write section */ + @JsonInclude(JsonInclude.Include.NON_DEFAULT) long totalWrite, + double writeOps, + @JsonInclude(JsonInclude.Include.NON_DEFAULT) long totalFailedWrite, + double writeFps, + HistogramSnapshot writeLatencyMs, + /* ops read section */ + @JsonInclude(JsonInclude.Include.NON_DEFAULT) long totalRead, + double readOps, + @JsonInclude(JsonInclude.Include.NON_DEFAULT) long totalFailedRead, + double readFps, + HistogramSnapshot readLatencyMs) { + this.definition = definition; + this.timestamp = timestamp; + this.totalWrite = totalWrite; + this.writeOps = writeOps; + this.totalFailedWrite = totalFailedWrite; + this.writeFps = writeFps; + this.writeLatencyMs = writeLatencyMs; + this.totalRead = totalRead; + this.readOps = readOps; + this.totalFailedRead = totalFailedRead; + this.readFps = readFps; + this.readLatencyMs = readLatencyMs; + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Doubles.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Doubles.java new file mode 100644 index 00000000..61ff518a --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Doubles.java @@ -0,0 +1,28 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import java.math.BigDecimal; +import java.math.RoundingMode; + +public final class Doubles { + + public static double format2Scale(double value) { + BigDecimal decimal = new BigDecimal(value); + decimal = decimal.setScale(2, RoundingMode.UP); + return decimal.doubleValue(); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/HistogramSnapshot.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/HistogramSnapshot.java new file mode 100644 index 00000000..1dcbacb0 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/HistogramSnapshot.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import lombok.Data; +import org.HdrHistogram.Histogram; + +@Data +public final class HistogramSnapshot { + private final double p50; + private final double p95; + private final double p99; + private final double p999; + private final double max; + + public HistogramSnapshot(double p50, double p95, double p99, double p999, double max) { + this.p50 = p50; + this.p95 = p95; + this.p99 = p99; + this.p999 = p999; + this.max = max; + } + + public static HistogramSnapshot fromHistogram(Histogram histogram) { + return new HistogramSnapshot( + histogram.getValueAtPercentile(50) / 1000.0, + histogram.getValueAtPercentile(95) / 1000.0, + histogram.getValueAtPercentile(99) / 1000.0, + histogram.getValueAtPercentile(999) / 1000.0, + histogram.getMaxValue() / 1000.0); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/LogOutput.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/LogOutput.java new file mode 100644 index 00000000..d9dcbc48 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/LogOutput.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +final class LogOutput implements Output { + private static final ObjectMapper mapper = new ObjectMapper(); + private final boolean pretty; + + public LogOutput(boolean pretty) { + this.pretty = pretty; + mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + } + + @Override + public void report(BenchmarkReportSnapshot report) { + final String s; + try { + if (pretty) { + s = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(report); + } else { + s = mapper.writeValueAsString(report); + } + } catch (Throwable ex) { + throw new OutputException(ex.getMessage()); + } + log.info(s); + } + + @Override + public void close() {} +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Output.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Output.java new file mode 100644 index 00000000..9effb6a3 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Output.java @@ -0,0 +1,23 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import java.io.Closeable; + +public interface Output extends Closeable { + + void report(BenchmarkReportSnapshot report); +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputException.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputException.java new file mode 100644 index 00000000..26403297 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputException.java @@ -0,0 +1,22 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +public final class OutputException extends RuntimeException { + public OutputException(String message) { + super(message); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputOptions.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputOptions.java new file mode 100644 index 00000000..5b427741 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputOptions.java @@ -0,0 +1,22 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +public record OutputOptions( + /* log output */ + boolean pretty, + /* pulsar output */ + PulsarOutputOptions pulsarOptions) {} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputTypes.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputTypes.java new file mode 100644 index 00000000..e4ee0570 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/OutputTypes.java @@ -0,0 +1,35 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import static java.util.Objects.requireNonNull; + +import java.util.Locale; + +public enum OutputTypes { + LOG, + PULSAR; + + public static OutputTypes fromString(String type) { + requireNonNull(type); + for (OutputTypes gType : OutputTypes.values()) { + if (gType.name().toLowerCase(Locale.ROOT).equals(type.toLowerCase(Locale.ROOT))) { + return gType; + } + } + throw new IllegalArgumentException("unknown generator type:" + type); + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Outputs.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Outputs.java new file mode 100644 index 00000000..8c500ca5 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/Outputs.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +public final class Outputs { + + public static Output createLogOutput(boolean pretty) { + return new LogOutput(pretty); + } + + public static Output createOutput(OutputTypes types, OutputOptions options) { + return switch (types) { + case LOG -> new LogOutput(options.pretty()); + case PULSAR -> { + final PulsarOutputOptions pulsarOutputOptions = options.pulsarOptions(); + if (!pulsarOutputOptions.validate()) { + throw new IllegalArgumentException("unexpected pulsar options"); + } + yield new PulsarOutput(options.pulsarOptions()); + } + }; + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/PulsarOutput.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/PulsarOutput.java new file mode 100644 index 00000000..6bb65cc0 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/PulsarOutput.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import io.streamnative.oxia.client.perf.ycsb.WorkerException; +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.*; + +@Slf4j +final class PulsarOutput implements Output { + private final PulsarClient client; + private final Producer producer; + + public PulsarOutput(PulsarOutputOptions options) { + try { + this.client = + PulsarClient.builder() + .serviceUrl(options.serviceURL()) + .authentication(options.authenticationPlugin(), options.authenticationParams()) + .build(); + this.producer = + client + .newProducer(Schema.AVRO(BenchmarkReportSnapshot.class)) + .topic(options.targetTopic()) + .create(); + } catch (PulsarClientException ex) { + throw new WorkerException(ex); + } + } + + @Override + public void report(BenchmarkReportSnapshot report) { + try { + final MessageId send = producer.send(report); + log.info("output to pulsar success, the response message id: {}", send); + } catch (Throwable e) { + throw new WorkerException(e); + } + } + + @Override + public void close() throws IOException { + if (client != null) { + client.close(); + } + } +} diff --git a/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/PulsarOutputOptions.java b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/PulsarOutputOptions.java new file mode 100644 index 00000000..6f1bf309 --- /dev/null +++ b/perf-ycsb/src/main/java/io/streamnative/oxia/client/perf/ycsb/output/PulsarOutputOptions.java @@ -0,0 +1,32 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.perf.ycsb.output; + +import static com.google.common.base.Strings.*; + +public record PulsarOutputOptions( + String serviceURL, + String targetTopic, + String authenticationPlugin, + String authenticationParams) { + + public boolean validate() { + if (isNullOrEmpty(serviceURL) || isNullOrEmpty(targetTopic)) { + return false; + } + return isNullOrEmpty(authenticationPlugin) || !isNullOrEmpty(authenticationParams); + } +} diff --git a/pom.xml b/pom.xml index 97f75c1c..d7191c9d 100644 --- a/pom.xml +++ b/pom.xml @@ -499,5 +499,11 @@ + + perf-ycsb + + perf-ycsb + +