diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 450aaa76..e6a4dbec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,10 +32,14 @@ jobs: run: | cd betree/tests cargo build --tests + cargo build --tests --release cargo clean --package betree-tests + cargo clean --release --package betree-tests cd .. cargo build --tests + cargo build --tests --release cargo clean --package betree_storage_stack + cargo clean --release --package betree_storage_stack betree-integration: name: Integration Tests needs: dependencies @@ -134,7 +138,8 @@ jobs: with: path: | ~/.cargo - key: ubuntu-22.04-rustc-${{ env.RUSTC_VERSION }}-msrv + target + key: ubuntu-22.04-rustc-${{ env.RUSTC_VERSION }}-${{ hashFiles('**/Cargo.toml') }}-msrv - name: Prepare JULEA run: | sudo apt update || true @@ -174,14 +179,15 @@ jobs: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y . "${HOME}/.cargo/env" echo "RUSTC_VERSION=$(rustc --version | grep --only-matching '[0-9]\+\.[0-9]\+\.[0-9]\+' | head --lines=1)" >> $GITHUB_ENV - - name: Cache fio + - name: Cache id: cache uses: actions/cache@v3 # Environment variables do not seem to work, use ~ instead. with: path: | ~/.cargo - key: ubuntu-22.04-rustc-${{ env.RUSTC_VERSION }}-fio-haura + target + key: ubuntu-22.04-rustc-${{ env.RUSTC_VERSION }}-${{ hashFiles('**/Cargo.toml') }} - name: Build betree run: | cd betree @@ -201,3 +207,61 @@ jobs: make fio export BETREE_CONFIG=$(realpath ./.ci/haura.json) ./fio --direct=1 --rw=write --bs=4M --ioengine=external:src/fio-engine-haura.o --numjobs=1 --name=iops-test-job --size=128M + haura-benchmarks: + name: Benchmark Compatibility Tests + runs-on: ubuntu-22.04 + timeout-minutes: 60 + needs: dependencies + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + persist-credentials: false + - name: Install Rust + run: | + rm --recursive --force "${HOME}/.cargo" "${HOME}/.rustup" + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + . "${HOME}/.cargo/env" + echo "RUSTC_VERSION=$(rustc --version | grep --only-matching '[0-9]\+\.[0-9]\+\.[0-9]\+' | head --lines=1)" >> $GITHUB_ENV + - name: Common Cache + uses: actions/cache@v3 + with: + path: | + ~/.cargo + target + key: ubuntu-22.04-rustc-${{ env.RUSTC_VERSION }}-${{ hashFiles('**/Cargo.toml') }} + - name: Poetry and Data Cache + id: cache + uses: actions/cache@v3 + # Environment variables do not seem to work, use ~ instead. + with: + path: | + ~/.cache/pip + ~/.cache/pypoetry + betree/haura-benchmarks/data + betree/haura-benchmarks/haura-plots/poetry.lock + key: ubuntu-22.04-rustc-${{ env.RUSTC_VERSION }}-${{ hashFiles('**/Cargo.toml') }}-${{ hashFiles('**/pyproject.toml')}}-benchmarks + - name: Build haura-benchmarks + run: | + cd betree/haura-benchmarks + cargo build --release + - name: Prepare scripts and configuration + run: | + cd betree/haura-benchmarks + cp example_config/.ci-config.json perf-config.json + echo "ci" >> run.sh + - name: Run benchmark smoke test + run: | + cd betree/haura-benchmarks + ./run.sh smoke-test + - name: Prepare poetry for plots + run: | + sudo apt update || true + sudo apt --yes --no-install-recommends install python3-poetry + cd betree/haura-benchmarks/haura-plots + poetry install + - name: Run plots + run: | + cd betree/haura-benchmarks/haura-plots + poetry run plots ../results/*/* + rm -rf results diff --git a/betree/haura-benchmarks/.gitignore b/betree/haura-benchmarks/.gitignore new file mode 100644 index 00000000..b7f53e6e --- /dev/null +++ b/betree/haura-benchmarks/.gitignore @@ -0,0 +1,8 @@ +**/target +**/*.rs.bk +**/*.swp +heaptrack* +Cargo.lock + +results +data diff --git a/betree/haura-benchmarks/Cargo.toml b/betree/haura-benchmarks/Cargo.toml new file mode 100644 index 00000000..dd821acd --- /dev/null +++ b/betree/haura-benchmarks/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "betree-perf" +version = "0.1.0" +authors = ["tilpner "] +edition = "2018" + +[workspace] +members = ["."] + +[dependencies] +betree_storage_stack = { path = "..", features = ["experimental-api"]} + +structopt = "0.3" +figment = { version = "0.10", features = [ "json" ] } +serde_json = "1" +libmedium = "0.7" +procfs = "0.16" +rand = "0.8" +rand_xoshiro = "0.6" +crossbeam = "0.8" +jemallocator = { version = "0.5", features = ["background_threads"] } +log = "0.4" + +# Dependent on versions from haura +parking_lot = "0.11" +zip = "0.5" diff --git a/betree/haura-benchmarks/README.md b/betree/haura-benchmarks/README.md new file mode 100644 index 00000000..c20018a9 --- /dev/null +++ b/betree/haura-benchmarks/README.md @@ -0,0 +1,44 @@ +# betree-perf + +This directory contains some additional tools and benchmarks which can be helpful when assessing the performance + +- `src/bin/{json-flatten,json-merge,sysinfo-log}.rs`: Tooling to aggregate multiple newline-delimited JSON streams into one final file +- `src/lib.rs`: Shared setup between benchmarks +- `src/main.rs`: CLI to select and configure a benchmark, also spawns the sysinfo-log binary +- `src/{ingest, rewrite, switchover, tiered1, zip, scientific_evaluation, filesystem, filesystem_zip, checkpoints}.rs`: Individual benchmarks +- `run.sh`: Example usage, runs benchmarks with different configurations + +## Configuration + +All benchmark invocations can be seen in the `run.sh` script, which can be used +to create a custom benchmark run. Benchmarks are represented by their own +function you can uncomment at the bottom of the script. + +If you have followed the general scripts to setup `bectl` and `haura` in the +[documentation](https://parcio.github.io/haura/) you are good to go. Otherwise, +provide a configuration for the benchmarks either by pointing to a valid +configuration in the `BETREE_CONFIG` environment variable or by creating a +`perf-config.json` in the `haura-benchmarks` directory. A collection of example +configurations can be found in the `example_config` directory. + +``` sh +$ # EITHER +$ export BETREE_CONFIG= +$ # OR +$ cp example_config/example-config.json perf-config.json +``` + +Be sure to modify the example config, if chosen, to your desired specification. + + +## Running the benchmark + +If you have configured your benchmarks *and* chosen a configuration for Haura, +you can start the benchmark. If required for identification of multiple runs a +name can be given with each invocation which will be used in the stored results: + +``` sh +$ ./run.sh my-benchmark-run +``` + +After each individual benchmark an idle period of 1 minute is done by default. diff --git a/betree/haura-benchmarks/example_config/.ci-config.json b/betree/haura-benchmarks/example_config/.ci-config.json new file mode 100644 index 00000000..7821520e --- /dev/null +++ b/betree/haura-benchmarks/example_config/.ci-config.json @@ -0,0 +1,38 @@ +{ + "storage": { + "tiers": [ + { + "top_level_vdevs": [ + { + "mem": 2147483648 + } + ], + "preferred_access_type": "Unknown" + } + ], + "queue_depth_factor": 20, + "thread_pool_size": null, + "thread_pool_pinned": false + }, + "alloc_strategy": [ + [ + 0 + ], + [ + 0 + ], + [ + 0 + ], + [ + 0 + ] + ], + "default_storage_class": 0, + "compression": "None", + "cache_size": 4294967296, + "access_mode": "AlwaysCreateNew", + "sync_interval_ms": 1000, + "migration_policy": null, + "metrics": null +} diff --git a/betree/haura-benchmarks/example_config/example-config.json b/betree/haura-benchmarks/example_config/example-config.json new file mode 100644 index 00000000..ace8b289 --- /dev/null +++ b/betree/haura-benchmarks/example_config/example-config.json @@ -0,0 +1,47 @@ +{ + "storage": { + "tiers": [ + { + "top_level_vdevs": [ + { + "mem": 4294967296 + } + ], + "preferred_access_type": "Unknown" + }, + { + "top_level_vdevs": [ + { + "path": "/tmp/example_disk", + "direct": true + } + ], + "preferred_access_type": "Unknown" + } + ], + "queue_depth_factor": 20, + "thread_pool_size": null, + "thread_pool_pinned": false + }, + "alloc_strategy": [ + [ + 0 + ], + [ + 0 + ], + [ + 0 + ], + [ + 0 + ] + ], + "default_storage_class": 0, + "compression": "None", + "cache_size": 268435456, + "access_mode": "AlwaysCreateNew", + "sync_interval_ms": null, + "migration_policy": null, + "metrics": null +} diff --git a/betree/haura-benchmarks/example_config/new-config-w-lfu.json b/betree/haura-benchmarks/example_config/new-config-w-lfu.json new file mode 100644 index 00000000..af9b92e7 --- /dev/null +++ b/betree/haura-benchmarks/example_config/new-config-w-lfu.json @@ -0,0 +1,63 @@ +{ + "storage": { + "tiers": [ + { + "top_level_vdevs": [ + { + "mem": 16106127360 + } + ], + "preferred_access_type": "Unknown" + }, + { + "top_level_vdevs": [ + { + "path": "/tmp/disk_a", + "direct": true + } + ], + "preferred_access_type": "Unknown" + } + ], + "queue_depth_factor": 20, + "thread_pool_size": null, + "thread_pool_pinned": false + }, + "alloc_strategy": [ + [ + 0 + ], + [ + 1 + ], + [ + 2 + ], + [ + 3 + ] + ], + "default_storage_class": 0, + "compression": "None", + "cache_size": 4294967296, + "access_mode": "AlwaysCreateNew", + "sync_interval_ms": 1000, + "migration_policy": { + "Lfu": { + "grace_period": { + "secs": 0, + "nanos": 0 + }, + "migration_threshold": 0.9, + "update_period": { + "secs": 1, + "nanos": 0 + }, + "policy_config": { + "promote_num": 99999, + "promote_size": 128 + } + } + }, + "metrics": null +} diff --git a/betree/haura-benchmarks/example_config/new-config-w-rl.json b/betree/haura-benchmarks/example_config/new-config-w-rl.json new file mode 100644 index 00000000..e1f65a20 --- /dev/null +++ b/betree/haura-benchmarks/example_config/new-config-w-rl.json @@ -0,0 +1,60 @@ +{ + "storage": { + "tiers": [ + { + "top_level_vdevs": [ + { + "mem": 16106127360 + } + ], + "preferred_access_type": "Unknown" + }, + { + "top_level_vdevs": [ + { + "path": "/tmp/disk_a", + "direct": true + } + ], + "preferred_access_type": "Unknown" + } + ], + "queue_depth_factor": 20, + "thread_pool_size": null, + "thread_pool_pinned": false + }, + "alloc_strategy": [ + [ + 0 + ], + [ + 1 + ], + [ + 2 + ], + [ + 3 + ] + ], + "default_storage_class": 0, + "compression": "None", + "cache_size": 4294967296, + "access_mode": "AlwaysCreateNew", + "sync_interval_ms": 1000, + "migration_policy": { + "ReinforcementLearning": { + "grace_period": { + "secs": 0, + "nanos": 0 + }, + "migration_threshold": 0.8, + "update_period": { + "secs": 1, + "nanos": 0 + }, + "policy_config": null + } + }, + "metrics": null +} diff --git a/betree/haura-benchmarks/example_config/new-config-wo-migration.json b/betree/haura-benchmarks/example_config/new-config-wo-migration.json new file mode 100644 index 00000000..f3954f79 --- /dev/null +++ b/betree/haura-benchmarks/example_config/new-config-wo-migration.json @@ -0,0 +1,47 @@ +{ + "storage": { + "tiers": [ + { + "top_level_vdevs": [ + { + "mem": 16106127360 + } + ], + "preferred_access_type": "Unknown" + }, + { + "top_level_vdevs": [ + { + "path": "/tmp/disk_a", + "direct": true + } + ], + "preferred_access_type": "Unknown" + } + ], + "queue_depth_factor": 20, + "thread_pool_size": null, + "thread_pool_pinned": false + }, + "alloc_strategy": [ + [ + 0 + ], + [ + 1 + ], + [ + 2 + ], + [ + 3 + ] + ], + "default_storage_class": 0, + "compression": "None", + "cache_size": 4294967296, + "access_mode": "AlwaysCreateNew", + "sync_interval_ms": 1000, + "migration_policy": null, + "metrics": null +} diff --git a/betree/haura-benchmarks/example_config/v0.2.0-config.json b/betree/haura-benchmarks/example_config/v0.2.0-config.json new file mode 100644 index 00000000..6006a7e5 --- /dev/null +++ b/betree/haura-benchmarks/example_config/v0.2.0-config.json @@ -0,0 +1,33 @@ +{ + "storage": { + "tiers": [ + [{ + "mem": 85899345920 + }], + ["/tmp/disk_a"] + ], + "queue_depth_factor": 20, + "thread_pool_size": null, + "thread_pool_pinned": false + }, + "alloc_strategy": [ + [ + 0 + ], + [ + 1 + ], + [ + 2 + ], + [ + 3 + ] + ], + "default_storage_class": 0, + "compression": "None", + "cache_size": 268435456, + "access_mode": "AlwaysCreateNew", + "sync_interval_ms": 1000, + "metrics": null +} diff --git a/betree/haura-benchmarks/haura-plots/.gitignore b/betree/haura-benchmarks/haura-plots/.gitignore new file mode 100644 index 00000000..bee8a64b --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/betree/haura-benchmarks/haura-plots/README.md b/betree/haura-benchmarks/haura-plots/README.md new file mode 100644 index 00000000..9806ea14 --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/README.md @@ -0,0 +1,43 @@ +# haura-plots + +This directory contains some python scripts to visualize the benchmark results +of the provided scenarios. + +## Install + +You require `python3` and `poetry` (https://python-poetry.org/docs/) to run +these scripts. + +Install poetry if not already present: + +``` sh +# Fedora, RHEL, ... +$ sudo dnf install poetry +# Ubuntu +$ sudo apt update +$ sudo apt install python3-poetry +# Alpine +$ apk update +$ apk add poetry +# Or checkout their webpage https://python-poetry.org/docs/#installation +``` + +If poetry is up and running install the required depedencies: + +``` sh +$ poetry install +``` + +### Animations + +When creating animated plots like the object distribution for policies `ffmpeg` is required with the `libx264` codec. You can check your `ffmpeg` codecs with + +``` sh +$ ffmpeg -codecs | grep libx264 +``` + +## Usage + +``` sh +$ poetry run plots +``` diff --git a/betree/haura-benchmarks/haura-plots/create_animation.sh b/betree/haura-benchmarks/haura-plots/create_animation.sh new file mode 100755 index 00000000..6ccb71df --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/create_animation.sh @@ -0,0 +1,10 @@ +#!/bin/env bash + +pushd $1 + +if [ -e plot_timestep_000.png ] +then + ffmpeg -framerate 2 -i plot_timestep_%03d.png -c:v libx264 -pix_fmt yuv420p plot_timestep.mp4 +fi + +popd diff --git a/betree/haura-benchmarks/haura-plots/haura_plots/__init__.py b/betree/haura-benchmarks/haura-plots/haura_plots/__init__.py new file mode 100755 index 00000000..26ea2d3d --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/haura_plots/__init__.py @@ -0,0 +1,268 @@ +#!/bin/env python +import json +import sys +import os +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +import matplotlib.ticker as ticker +import matplotlib.cm as cm +import matplotlib.colors as mat_col +import matplotlib +import subprocess + +from . import util +from . import metrics_plots +from . import cache_plots + +def sort_by_o_id(key): + """ + Access string subslice and first tuple member + """ + return int(key[0][2:]) + +def plot_object_distribution(path): + """ + Plot colorcoded grids to show object distribution + """ + data = [] + if not os.path.exists(f"{path}/tier_state.jsonl"): + return + + with open(f"{path}/tier_state.jsonl", 'r', encoding='UTF-8') as state_file: + data = util.read_jsonl(state_file) + colors = { + 0: util.WHITE, + 1: util.GREEN, + 2: util.YELLOW, + 3: util.BLUE, + } + cmap = mat_col.ListedColormap([x[1] for x in colors.items()]) + labels = np.array(["Not present", "Fastest", "Fast", "Slow"]) + num_ts = 0 + # three groups fixed + mean_group_vals = [[], [], []] + for current_timestep in data: + # Read all names and order + # Iterate over each tier and add keys to known keys + keys = [] # Vec<(key, num_tier)> + num_tier = 1 + for tier in current_timestep: + for obj in tier["files"]: + keys.append((obj, num_tier)) + num_tier += 1 + + keys.sort(key=sort_by_o_id) + + # old boundaries update when needed + # seldom accessed 1-2000 (45x45) + # barely accessed 2001-2300 (18x18) + # often accessed 2301-2320 (5x5) + group_1 = [n[1] for n in keys[:4030]] + group_2 = [n[1] for n in keys[4030:4678]] + group_3 = [n[1] for n in keys[4678:4728]] + + # Reshape to matrix and fill with zeros if necessary + group_1 = np.concatenate((np.array(group_1), np.zeros(4096 - len(group_1)))).reshape((64,64)) + group_2 = np.concatenate((np.array(group_2), np.zeros(676 - len(group_2)))).reshape((26,26)) + group_3 = np.concatenate((np.array(group_3), np.zeros(64 - len(group_3)))).reshape((8,8)) + + num_group = 0 + fig, axs = plt.subplots(1, 4, figsize=(20,5)) + for group in [group_1, group_2, group_3]: + subax = axs[num_group] + mean = group[group > 0].mean() + mean_group_vals[num_group].append(mean) + subax.set_title(f"Object mean level: {mean}") + subax.tick_params(color="white") + num_group += 1 + im = subax.imshow(group, cmap=cmap) + im.set_clim(0, 3) + subax.yaxis.set_ticks([]) + subax.xaxis.set_ticks([]) + #divider = make_axes_locatable(subax) + #cax = divider.append_axes("right", size="5%", pad=0.05) + #fig.colorbar(im, cax=cax) + fmt = matplotlib.ticker.FuncFormatter(lambda x, pos: labels[x]) + ticks = [0, 1, 2, 3] + fig.colorbar(cm.ScalarMappable(cmap=cmap, norm=mat_col.NoNorm()), format=fmt, ticks=ticks) + + # Plot response times if available + if 'reqs' in current_timestep[0]: + times = [] + num_tiers = 0 + for tier in current_timestep: + num_tiers += 1 + resp_times = 0 + total = 0 + for o_id in tier["reqs"]: + resps = tier["reqs"][f"{o_id}"] + size = tier["files"][f"{o_id}"][1] + for resp in resps: + total += 1 + resp_times += resp["response_time"]["nanos"] / size + if total != 0: + times.append(resp_times / total) + else: + times.append(0) + x_ticks = np.arange(0, num_tiers) + width = 0.35 + # convert from nanos to millis + axs[3].bar(x_ticks, np.array(times) / 1000000, width, label='Access latency', hatch=['.', '+', '/'], color='white', edgecolor='black') + axs[3].set_title('Mean access latency for timestep') + axs[3].set_ylabel('Mean latency in ms') + #axs[3].set_ylim(0, 100) + axs[3].set_xticks(x_ticks, labels=["Fastest", "Fast", "Slow"]) + + fig.savefig(f"{path}/plot_timestep_{num_ts:0>3}.png") + matplotlib.pyplot.close(fig) + num_ts += 1 + + fig, ax = plt.subplots(figsize=(10,5)) + ax.plot(mean_group_vals[0], color=util.ORANGE, label="Seldomly Accessed Group", marker="o", markevery=10); + ax.plot(mean_group_vals[1], color=util.LIGHT_BLUE, label="Occassionally Accessed", marker="s", markevery=10); + ax.plot(mean_group_vals[2], color=util.RED, label="Often Accessed", marker="^", markevery=10); + # we might want to pick the actual timestamps for this + ax.set_xlabel("Timestep") + ax.set_ylabel("Mean object tier") + ax.set_title("Mean tier of all object groups over time") + ax.set_ylim((1,3)) + pls_no_cut_off = ax.legend(bbox_to_anchor=(1.0,1.0), loc="upper left") + fig.savefig(f"{path}/plot_timestep_means.svg", bbox_extra_artists=(pls_no_cut_off,), bbox_inches='tight') + # Create animation + subprocess.run(["./create_animation.sh", path], check=True) + +# TODO: Adjust bucket sizes +def size_buckets(byte): + if byte <= 64000: + return 64000 + elif byte <= 256000: + return 256000 + elif byte <= 1000000: + return 1000000 + elif byte <= 4000000: + return 4000000 + else: + return 1000000000 + +def bytes_to_lexical(byte): + if byte >= 1000000: + return f"{byte/1000/1000}MB" + return f"{byte/1000}KB" + +def plot_filesystem_test(): + dat = pd.read_csv(f"{sys.argv[1]}/filesystem_measurements.csv") + # groups + fig, axs = plt.subplots(2,3, figsize=(15,5)) + min_read = 99999999999999999 + min_write = 99999999999999999 + max_read = 0 + max_write = 0 + for n in range(3): + sizes = dat[dat['group'] == n]['size'].to_numpy() + reads = {} + reads_raw = dat[dat['group'] == n]['read_latency_ns'].to_numpy() + writes = {} + writes_raw = dat[dat['group'] == n]['write_latency_ns'].to_numpy() + for (idx, size) in enumerate(sizes): + if size_buckets(size) not in reads: + reads[size_buckets(size)] = [] + reads[size_buckets(size)].append(reads_raw[idx]) + if size_buckets(size) not in writes: + writes[size_buckets(size)] = [] + writes[size_buckets(size)].append(writes_raw[idx]) + + sorted_sizes = list(reads) + sorted_sizes.sort() + labels = [] + reads_plot = [] + writes_plot = [] + for size in sorted_sizes: + labels.append(bytes_to_lexical(size)) + a = np.array(reads[size]) / 1000 + min_read = min(min_read, a.min()) + max_read = max(max_read, a.max()) + reads_plot.append(a) + b = np.array(writes[size]) / 1000 + min_write = min(min_write, b.min()) + max_write = max(max_write, b.max()) + writes_plot.append(b) + axs[0][n].boxplot(reads_plot, vert=True, labels=labels) + axs[0][n].set_yscale('log') + match n: + case 0: + axs[0][n].set_title("Seldomly Accessed") + case 1: + axs[0][n].set_title("Occassionally Accessed") + case 2: + axs[0][n].set_title("Often Accessed") + axs[0][n].set_ylabel("Read latency (μs)") + axs[1][n].boxplot(writes_plot, vert=True, labels=labels) + axs[1][n].set_yscale('log') + axs[1][n].set_ylabel("Write latency (μs)") + + for n in range(3): + axs[0][n].set_ylim(min(min_read, min_write),max_read + 10000000) + axs[1][n].set_ylim(min(min_read, min_write),max_write + 10000000) + + fig.savefig(f"{sys.argv[1]}/filesystem_comp.svg") + plt.close(fig) + + +def plot_evaluation_latency(path, variant): + if not os.path.exists(f"{path}/evaluation_{variant}.csv"): + return + + data = pd.read_csv(f"{path}/evaluation_{variant}.csv"); + + fig, ax = plt.subplots(1,1,figsize=(6,4)) + reads = data[data['op'] == 'r'] + writes = data[data['op'] == 'w'] + ax.scatter(reads['size'], reads['latency_ns'], marker='x', label="read") + ax.scatter(writes['size'], writes['latency_ns'], marker='.', label="write") + xticks = np.arange(0, 12 * 1024 * 1024 + 1, 2 * 1024 * 1024) + ax.set_xticks(xticks, [int(x / 1024) for x in xticks]) + ax.set_xlabel("Size in KiB") + ax.set_ylabel("Latency in ns") + ax.set_yscale("log") + label=' | '.join(path.split('/')[-2:]) + ax.set_title(f"Haura - {label}") + pls_no_cut_off = ax.legend(bbox_to_anchor=(1.0,1.0), loc="upper left") + fig.savefig( + f"{path}/evaluation_{variant}.svg", + bbox_extra_artists=(pls_no_cut_off,), + bbox_inches='tight' + ) + plt.close(fig) + +USAGE_HELP="""Please specify an input run directory. If you already completed \ +benchmarks they can be found under `results/*`. + +Usage: + haura-plots +""" + +def main(): + if len(sys.argv) < 2: + print(USAGE_HELP) + sys.exit(2) + data = [] + + # Prep the color scheme + util.init_colormap() + + for path in sys.argv[1:]: + with open(f"{path}/betree-metrics.jsonl", 'r', encoding="UTF-8") as metrics: + data = util.read_jsonl(metrics) + # Plot actions + metrics_plots.plot_throughput(data, path) + metrics_plots.plot_tier_usage(data, path) + plot_evaluation_latency(path, "read") + plot_evaluation_latency(path, "rw") + plot_object_distribution(path) + metrics_plots.plot_system(path) + cache_plots.plot_cache(data, path) + #plot_filesystem_test() + +if __name__ == "__main__": + main() diff --git a/betree/haura-benchmarks/haura-plots/haura_plots/cache_plots.py b/betree/haura-benchmarks/haura-plots/haura_plots/cache_plots.py new file mode 100644 index 00000000..1f9346dd --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/haura_plots/cache_plots.py @@ -0,0 +1,61 @@ +""" +Plots visualizing cache statistics. +""" +from . import util +import numpy as np +import matplotlib.pyplot as plt + +def plot_cache(data, path): + """Plot cache statistics.""" + epoch = [temp['epoch_ms'] for temp in data] + util.subtract_first_index(epoch) + + fig, axs = plt.subplots(3,1, figsize=(10, 9)) + eticks = range(0, epoch[-1:][0], 30 * 10**3); + eticks_formatted = list(map(util.ms_to_string, eticks)) + + # Capacity vs Size (Peak Check) + cap = np.array([temp['cache']['capacity'] / 1024 / 1024 for temp in data]) + axs[0].plot(epoch, cap, label='capacity', linestyle=':') + axs[0].plot(epoch, [temp['cache']['size'] / 1024 / 1024 for temp in data], label='size') + axs[0].set_xticks(eticks, eticks_formatted) + axs[0].set_ylabel("Size [MiB]") + oax = axs[0].twinx() + elems = np.array([temp['cache']['len'] for temp in data]) + oax.plot(epoch, elems, label='entries', drawstyle='steps') + oax.set_ylim(top=elems.max() * 1.4) + oax.legend(bbox_to_anchor=(1.0, 1.2)) + oax.set_ylabel("# of entries") + axs[0].legend(ncols=2, bbox_to_anchor=(0.8, 1.2)) + + # Hits vs Misses (Keep one high, the other low) + hits = np.array(util.diff_window([temp['cache']['hits'] for temp in data])) + miss = np.array(util.diff_window([temp['cache']['misses'] for temp in data])) + axs[1].plot(epoch, hits, label='hits') + axs[1].plot(epoch, miss, label='misses') + axs[1].set_xticks(eticks, eticks_formatted) + axs[1].set_ylabel("# per 500ms") + oax = axs[1].twinx() + # this may be zero somewhere in there + with np.errstate(divide='ignore', invalid='ignore'): + oax.plot(epoch, hits / (hits + miss) * 100, label="Hit-Miss-Ratio", linestyle=':') + oax.set_ylabel("Hits [%]") + oax.legend(bbox_to_anchor=(1.0, 1.2)) + axs[1].legend(ncols=2, bbox_to_anchor=(0.8, 1.2)) + + # insertions (reads, new nodes, updates, writes) vs evictions (updates, reads) vs removals (updates) + axs[2].plot(epoch, util.diff_window([temp['cache']['insertions'] for temp in data]), label='insertions') + axs[2].plot(epoch, util.diff_window([temp['cache']['evictions'] for temp in data]), label='evictions') + axs[2].plot(epoch, util.diff_window([temp['cache']['removals'] for temp in data]), label='removals') + axs[2].set_xticks(eticks, eticks_formatted) + axs[2].set_ylabel("# per 500ms") + axs[2].legend(ncols=3, bbox_to_anchor=(1.0, 1.2)) + axs[2].set_xlabel("Time [m:s]") + + label=' | '.join(path.split('/')[-2:]) + fig.suptitle(f"Haura - {label}") + fig.tight_layout() + fig.savefig( + f"{path}/cache_stats.svg", + ) + plt.close(fig) diff --git a/betree/haura-benchmarks/haura-plots/haura_plots/metrics_plots.py b/betree/haura-benchmarks/haura-plots/haura_plots/metrics_plots.py new file mode 100644 index 00000000..ebc6187c --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/haura_plots/metrics_plots.py @@ -0,0 +1,148 @@ +""" +Plots visualizing the metrics produced by Haura. +""" +from . import util +import numpy as np +import matplotlib.pyplot as plt + +def plot_throughput(data, path): + """ + Print a four row throughput plot with focussed read or write throughput. + """ + + epoch = [temp['epoch_ms'] for temp in data] + util.subtract_first_index(epoch) + epoch_formatted = list(map(util.ms_to_string, epoch)) + num_tiers = len(data[0]['storage']['tiers']) + fig, axs = plt.subplots(num_tiers, 1, figsize=(16,8)) + for tier_id in range(num_tiers): + for disk_id in range(len(data[0]['storage']['tiers'][tier_id]['vdevs'])): + writes = np.array([]) + reads = np.array([]) + for point in data: + writes = np.append(writes, point['storage']['tiers'][tier_id]['vdevs'][disk_id]['written']) + reads = np.append(reads, point['storage']['tiers'][tier_id]['vdevs'][disk_id]['read']) + + if len(writes) > 0: + util.subtract_last_index(writes) + util.subtract_last_index(reads) + + # convert to MiB from Blocks + # NOTE: We assume here a block size of 4096 bytes as this is the + # default haura block size if you change this you'll need to modify + # this here too. + writes = writes * util.BLOCK_SIZE / 1024 / 1024 * (util.SEC_MS / util.EPOCH_MS) + reads = reads * util.BLOCK_SIZE / 1024 / 1024 * (util.SEC_MS / util.EPOCH_MS) + + axs[tier_id].plot(epoch, reads, label = 'Read', linestyle='dotted', color=util.GREEN) + axs[tier_id].plot(epoch, writes, label = 'Written', color=util.BLUE) + axs[tier_id].set_xlabel("runtime (minute:seconds)") + axs[tier_id].set_xticks(epoch, epoch_formatted) + axs[tier_id].locator_params(tight=True, nbins=10) + axs[tier_id].set_ylabel(f"{util.num_to_name(tier_id)}\nMiB/s (I/0)") + label=' | '.join(path.split('/')[-2:]) + fig.legend(loc="center right",handles=axs[0].get_lines()) + # Epoch in seconds + fig.suptitle(f"Haura - {label}", y=0.98) # add title + fig.savefig(f"{path}/plot_write.svg") + for tier_id in range(num_tiers): + lines = axs[tier_id].get_lines() + if len(lines) > 0: + lines[0].set_linestyle('solid') + lines[0].zorder = 2.1 + lines[1].set_linestyle('dotted') + lines[1].zorder = 2.0 + fig.legend(loc="center right",handles=axs[0].get_lines()) + fig.savefig(f"{path}/plot_read.svg") + plt.close(fig) + +def plot_tier_usage(data, path): + """ + Plot the utilized space of each storage tier. + """ + fig, axs = plt.subplots(4, 1, figsize=(10,13)) + + # 0 - 3; Fastest - Slowest + free = [[], [], [], []] + total = [[], [], [], []] + # Map each timestep to an individual + for ts in data: + tier = 0 + for stat in ts["usage"]: + free[tier].append(stat["free"]) + total[tier].append(stat["total"]) + tier += 1 + + tier = 0 + for fr in free: + axs[tier].plot((np.array(total[tier]) - np.array(fr)) * 4096 / 1024 / 1024 / 1024, label="Used", marker="o", markevery=200, color=util.BLUE) + axs[tier].plot(np.array(total[tier]) * 4096 / 1024 / 1024 / 1024, label="Total", marker="^", markevery=200, color=util.GREEN) + axs[tier].set_ylim(bottom=0) + axs[tier].set_ylabel(f"{util.num_to_name(tier)}\nCapacity in GiB") + tier += 1 + + fig.legend(loc='center right',handles=axs[0].get_lines()) + fig.savefig(f"{path}/tier_usage.svg") + plt.close(fig) + + +def plot_system(path): + """Plot the system usage and temperatures during the run.""" + data = [] + with open(f"{path}/out.jsonl", 'r', encoding="UTF-8") as metrics: + data = util.read_jsonl(metrics) + + epoch = [temp['epoch_ms'] for temp in data] + util.subtract_first_index(epoch) + epoch_formatted = list(map(util.ms_to_string, epoch)) + min_pagefaults = [x["proc_minflt"] + x["proc_cminflt"] for x in data] + maj_pagefaults = [x["proc_majflt"] + x["proc_cmajflt"] for x in data] + virtual_mem = [x["proc_vsize"] for x in data] + resident_mem = [x["proc_rss"] for x in data] + utime = [x["proc_utime"] + x["proc_cutime"] for x in data] + stime = [x["proc_stime"] + x["proc_cstime"] for x in data] + + fig, axs = plt.subplots(3,2, figsize=(10, 10)) + eticks = range(0, epoch[-1:][0], 30 * 10**3); + eticks_formatted = list(map(util.ms_to_string, eticks)) + + # Page Faults (Minor) + axs[0][0].plot(epoch, min_pagefaults) + axs[0][0].set_ylabel("Minor Pagefaults (All threads)") + axs[0][0].set_xticks(eticks, eticks_formatted) + + # Page Faults (Major) + axs[1][0].plot(epoch, maj_pagefaults) + axs[1][0].set_ylabel("Major Pagefaults (All threads)") + axs[1][0].set_xticks(eticks, eticks_formatted) + + + # Show[0] in MiB + axs[2][0].plot(epoch, np.array(virtual_mem) / 1024 / 1024) + axs[2][0].set_ylabel("Virtual Memory [MiB]") + axs[2][0].set_xticks(eticks, eticks_formatted) + + + # Resident Memory + axs[2][1].plot(epoch, np.array(resident_mem)) + axs[2][1].set_ylabel("Resident Memory Pages [#]") + axs[2][1].set_xticks(eticks, eticks_formatted) + + # CPU time + axs[0][1].plot(epoch, utime, label="utime") + axs[0][1].plot(epoch, stime, label="stime") + axs[0][1].set_ylabel("time [s] (All threads)") + axs[0][1].set_xticks(eticks, eticks_formatted) + axs[0][1].legend(bbox_to_anchor=(1.35, 0.6)) + + + temps_keys = filter(lambda x: 'hwmon' in x and not 'Tccd' in x, data[0].keys()) + for (key, m) in zip(temps_keys, ['-', '--', '-.', ':']): + axs[1][1].plot(epoch, [x[key] for x in data], label=key, linestyle=m) + axs[1][1].set_xticks(eticks, eticks_formatted) + axs[1][1].set_ylabel("Temperature [C]") + axs[1][1].legend(bbox_to_anchor=(1.0, 0.6)) + + + fig.tight_layout() + fig.savefig(f"{path}/proc.svg") diff --git a/betree/haura-benchmarks/haura-plots/haura_plots/policy_plots.py b/betree/haura-benchmarks/haura-plots/haura_plots/policy_plots.py new file mode 100644 index 00000000..da2ae0d0 --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/haura_plots/policy_plots.py @@ -0,0 +1,3 @@ +def plot_delta(data, path): + """TODO""" + return diff --git a/betree/haura-benchmarks/haura-plots/haura_plots/util.py b/betree/haura-benchmarks/haura-plots/haura_plots/util.py new file mode 100644 index 00000000..8ca9a46d --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/haura_plots/util.py @@ -0,0 +1,109 @@ +""" +Utility functions which may be used in multiple plotting types. +""" + +import json +import matplotlib.pyplot as plt +from cycler import cycler + +# Constants +BLOCK_SIZE=4096 +EPOCH_MS=500 +SEC_MS=1000 + +# For color reference of "Wong" color scheme see: +# https://davidmathlogic.com/colorblind/#%23000000-%23E69F00-%2356B4E9-%23009E73-%23F0E442-%230072B2-%23D55E00-%23CC79A7 +WHITE='#FFFFFF' +GREEN='#009E73' +YELLOW='#F0E442' +BLUE='#0072B2' +LIGHT_BLUE='#56B4E9' +RED='#D55E00' +ORANGE='#E69F00' + +MARKERS=['x', '.', '^', 'v', '<', '>'] + +def init_colormap(): + """Create the "Wong" color scheme and set it as matplotlib default.""" + wong = cycler(linestyle=['-', '--', '-.']) * cycler(color=[ + "#56B4E9", + "#E69F00", + "#009E73", + "#F0E442", + "#0072B2", + "#D55E00", + "#CC79A7", + "#000000", + ]) + plt.rc('axes', prop_cycle=wong) + +# Formatting +def ms_to_string(time): + """Nicer formatter for epoch strings in figures""" + return f"{int(time / 1000 / 60)}:{int(time / 1000) % 60:02d}" + + +def read_jsonl(file): + """ + Read from a file descriptor line by line a json, parse it, and return a list + of read objects. + """ + data = [] + while True: + # Get next line from file + line = file.readline() + # if line is empty + # end of file is reached + if not line: + break + json_object = json.loads(line) + data.append(json_object) + return data + +def subtract_last_index(array): + """ + From a list of numbers subtract the value of the previous entry from the + next. Operates in-place. + """ + last_val = 0 + for index, value in enumerate(array): + array[index] = value - last_val + last_val = value + array[0] = 0 + return array + +def diff_window(array): + """ + From a list of numbers store the diff between n-1 and n in n. + Operates in-place. + """ + last_val = 0 + for index, value in enumerate(array): + array[index] = value - last_val + last_val = value + return array + +def subtract_first_index(array): + """ + From a list of numbers subtract the first entry from all entries. Operates + in-place. + """ + first_val = array[0] + for index, value in enumerate(array): + array[index] = value -first_val + return array + +def num_to_name(tier): + """Convert a number to the corresponding tier name in the storage + hierarchy.""" + match tier: + case 0: + return 'Fastest' + case 1: + return 'Fast' + case 2: + return 'Slow' + case 3: + return 'Slowest' + case _: + return '???' diff --git a/betree/haura-benchmarks/haura-plots/pyproject.toml b/betree/haura-benchmarks/haura-plots/pyproject.toml new file mode 100644 index 00000000..382b5ee1 --- /dev/null +++ b/betree/haura-benchmarks/haura-plots/pyproject.toml @@ -0,0 +1,21 @@ +[tool.poetry] +name = "haura-plots" +version = "0.1.0" +description = "Plotting haura benchmarks." +authors = ["Johannes Wünsche "] +license = "AGPL-3.0" + +[tool.poetry.dependencies] +python = "^3.10" +matplotlib = "^3.6.3" +numpy = "^1.24.1" +pandas = "^1.5.3" + +[tool.poetry.dev-dependencies] + +[tool.poetry.scripts] +plots = "haura_plots:main" + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/betree/haura-benchmarks/run.sh b/betree/haura-benchmarks/run.sh new file mode 100755 index 00000000..f6edbe72 --- /dev/null +++ b/betree/haura-benchmarks/run.sh @@ -0,0 +1,254 @@ +#!/usr/bin/env bash +# shellcheck disable=SC2030,SC2031 # we exploit this characteristic to start several test scenarios - merging them would lead to pollution + +function ensure_zip { + local url + url="https://cdn.kernel.org/pub/linux/kernel/v5.x/linux-5.15.58.tar.xz" + + if [ ! -e "$ZIP_ARCHIVE" ] + then + mkdir data + pushd data || exit + + curl "$url" -o linux.tar.xz + tar xf linux.tar.xz + rm linux.tar.xz + zip -0 -r linux.zip linux-* + rm -r linux-* + + popd || exit + fi +} + +function ensure_bectl { + pushd ../../bectl || exit + cargo build --release + popd || return +} + +function ensure_config { + if [ ! -e "$BETREE_CONFIG" ] + then + echo "No Haura configuration found at: ${BETREE_CONFIG}" + exit 1 + fi +} + +total_runs=0 + +function run { + local vdev_type="$1" + local name="$2" + local mode="$3" + shift 3 + + if [ "$total_runs" -gt 0 ] + then + sleep 60 + fi + total_runs=$((total_runs + 1)) + + local out_path + out_path="results/$(date -I)_${vdev_type}/${name}_$(date +%s)" + mkdir -p "$out_path" + + pushd "$out_path" || return + + echo "running $mode with these settings:" + env | grep BETREE__ + env > "env" + "$ROOT/../../target/release/bectl" config print-active > "config" + "$ROOT/target/release/betree-perf" "$mode" "$@" + + echo "merging results into $out_path/out.jsonl" + "$ROOT/target/release/json-merge" \ + --timestamp-key epoch_ms \ + ./betree-metrics.jsonl \ + ./proc.jsonl \ + ./sysinfo.jsonl \ + | "$ROOT/target/release/json-flatten" > "out.jsonl" + + popd || return +} + +function tiered() { + ( + export BETREE__ALLOC_STRATEGY='[[0],[0],[],[]]' + run "$RUN_IDENT" tiered1_all0_alloc tiered1 + ) + + ( + export BETREE__ALLOC_STRATEGY='[[0],[1],[],[]]' + run "$RUN_IDENT" tiered1_id_alloc tiered1 + ) + + ( + export BETREE__ALLOC_STRATEGY='[[1],[1],[],[]]' + run "$RUN_IDENT" tiered1_all1_alloc tiered1 + ) +} + +function scientific_evaluation() { + # Invocation: + run "$RUN_IDENT" random_evaluation_read evaluation-read 30 $((25 * 1024 * 1024 * 1024)) $((8192)) $((1 * 1024)) $((12 * 1024 * 1024)) +} + +function evaluation_rw() { + # Invocation: + run "$RUN_IDENT" random_evaluation_rw evaluation-rw 30 $((25 * 1024 * 1024 * 1024)) $((8192)) $((1 * 1024)) $((12 * 1024 * 1024)) +} + +function filesystem_zip() { + export BETREE__ALLOC_STRATEGY='[[0],[1],[2],[]]' + run "$RUN_IDENT" file_system_three "$ZIP_ARCHIVE" +} + +function checkpoints() { + export BETREE__ALLOC_STRATEGY='[[0, 1],[1],[],[]]' + run "$RUN_IDENT" checkpoints_fastest checkpoints +} + +function filesystem() { + export BETREE__ALLOC_STRATEGY='[[0],[1],[2],[]]' + run "$RUN_IDENT" file_system_three filesystem +} + +function zip_cache() { + local F_CD_START=1040032667 + + for cache_mib in 32 128 512 2048; do + ( + export BETREE__CACHE_SIZE=$((cache_mib * 1024 * 1024)) + run "$RUN_IDENT" "zip_cache_$cache_mib" zip 4 100 10 "$ZIP_ARCHIVE" "$F_CD_START" + ) + done +} + +function zip_mt() { + local F="$PWD/data/linux.zip" + local F_CD_START=1 + + for cache_mib in 256 512 1024 2048; do + echo "using $cache_mib MiB of cache" + ( + export BETREE__CACHE_SIZE=$((cache_mib * 1024 * 1024)) + + local total=10000 + + for num_workers in 1 2 3 4 5 6 7 8 9 10; do + echo "running with $num_workers workers" + local per_worker=$((total / num_workers)) + local per_run=$((per_worker / 10)) + + run "$RUN_IDENT" "zip_mt_${cache_mib}_${num_workers}_${per_run}_10" zip "$num_workers" "$per_run" 10 "$F" "$F_CD_START" + done + ) + done +} + +function zip_tiered() { + local F_CD_START=1 #242415017 #1040032667 + # for cache_mib in 256 512 1024; do + for cache_mib in 32 64; do + echo "using $cache_mib MiB of cache" + ( + export BETREE__CACHE_SIZE=$((cache_mib * 1024 * 1024)) + + local total=10000 + + for num_workers in 1 2 3 4 5 6 7 8; do + echo "running with $num_workers workers" + local per_worker=$((total / num_workers)) + local per_run=$((per_worker / 10)) + + ( + export BETREE__ALLOC_STRATEGY='[[0],[0],[],[]]' + run "$RUN_IDENT" "zip_tiered_all0_${cache_mib}_${num_workers}_${per_run}_10" zip "$num_workers" "$per_run" 10 "$ZIP_ARCHIVE" "$F_CD_START" + ) + + ( + export BETREE__ALLOC_STRATEGY='[[0],[1],[],[]]' + run "$RUN_IDENT" "zip_tiered_id_${cache_mib}_${num_workers}_${per_run}_10" zip "$num_workers" "$per_run" 10 "$ZIP_ARCHIVE" "$F_CD_START" + ) + + ( + export BETREE__ALLOC_STRATEGY='[[1],[1],[],[]]' + run "$RUN_IDENT" "zip_tiered_all1_${cache_mib}_${num_workers}_${per_run}_10" zip "$num_workers" "$per_run" 10 "$ZIP_ARCHIVE" "$F_CD_START" + ) + + done + ) + done +} + +function ingest() { + ( + ( + export BETREE__COMPRESSION="None" + run "$RUN_IDENT" ingest_hdd_none ingest "$ZIP_ARCHIVE" + ) + + for level in $(seq 1 16); do + ( + export BETREE__COMPRESSION="{ Zstd = { level = $level } }" + run "$RUN_IDENT" "ingest_hdd_zstd_$level" ingest "$ZIP_ARCHIVE" + ) + done + ) +} + +function switchover() { + run "$RUN_IDENT" switchover_tiny switchover 32 "$((32 * 1024 * 1024))" + run "$RUN_IDENT" switchover_small switchover 8 "$((128 * 1024 * 1024))" + run "$RUN_IDENT" switchover_medium switchover 4 "$((2 * 1024 * 1024 * 1024))" + run "$RUN_IDENT" switchover_large switchover 4 "$((8 * 1024 * 1024 * 1024))" +} + +function ci() { + run "$RUN_IDENT" switchover_small switchover 4 "$((128 * 1024 * 1024))" +} + +cargo build --release + +if [ -z "$BETREE_CONFIG" ] +then + export BETREE_CONFIG="$PWD/perf-config.json" +fi + +export ROOT="$PWD" +export ZIP_ARCHIVE="$PWD/data/linux.zip" +# Category under which the default runs should be made, a function may modify +# this if multiple categories are needed. +export RUN_IDENT="default" + +if [ "$1" == "-h" ] || [ "$1" == "--help" ] || [ "$1" = "help" ] +then + echo "Usage:" + echo " $0 [identifier]" + exit 0 +fi + +if [ -n "$*" ] +then + export RUN_IDENT=$* +fi + +ensure_bectl +ensure_zip +ensure_config + +# Uncomment the scenarios which you want to run. Assure that the used +# configuration is valid for the scenario as some of them require a minimum +# amount of tiers. + +#zip_cache +#zip_tiered +#zip_mt +#tiered +#scientific_evaluation +#evaluation_rw +#filesystem +#filesystem_zip +#checkpoints +#switchover +#ingest diff --git a/betree/haura-benchmarks/src/bin/json-flatten.rs b/betree/haura-benchmarks/src/bin/json-flatten.rs new file mode 100644 index 00000000..705331de --- /dev/null +++ b/betree/haura-benchmarks/src/bin/json-flatten.rs @@ -0,0 +1,56 @@ +//! json-flatten collapses each newline-delimited JSON document from stdin according to the following rules: +//! +//! - { "foo": { "bar": 42 } } => { "foo_bar": 42 } +//! - { "foo": [ 1, 2, 3 ] } => { "foo_0": 1, "foo_1": 2, "foo_2": 3 } +//! - { "foo": [ { "bar": 1, "quux": 2 }, { "bar": 3 } ] } +//! => { "foo_0_bar": 1, "foo_0_quux": 2, "foo_1_bar": 3 } + +use std::io::{self, Write}; + +use serde_json::{Deserializer, Map, Value}; + +fn flatten_into(out: &mut Map, prefix: Option<&str>, value: &Value) { + match value { + Value::Array(arr) => { + for (idx, item) in arr.iter().enumerate() { + let prefix = prefix + .map(|p| format!("{}_{}", p, idx)) + .unwrap_or_else(|| idx.to_string()); + + flatten_into(out, Some(&prefix), item); + } + } + Value::Object(map) => { + for (key, value) in map.iter() { + let prefix = prefix + .map(|p| format!("{}_{}", p, key)) + .unwrap_or_else(|| key.clone()); + + flatten_into(out, Some(&prefix), value); + } + } + val => { + out.insert(String::from(prefix.unwrap()), val.clone()); + } + } +} + +fn main() -> io::Result<()> { + let stdin = io::stdin(); + let stdin = stdin.lock(); + + let stdout = io::stdout(); + let mut stdout = stdout.lock(); + + let mut items = Deserializer::from_reader(stdin).into_iter(); + + while let Some(Ok(Value::Object(map))) = items.next() { + let mut flat = Map::new(); + flatten_into(&mut flat, None, &Value::Object(map)); + + serde_json::to_writer(&mut stdout, &Value::Object(flat))?; + writeln!(&mut stdout)?; + } + + Ok(()) +} diff --git a/betree/haura-benchmarks/src/bin/json-merge.rs b/betree/haura-benchmarks/src/bin/json-merge.rs new file mode 100644 index 00000000..f4b6cb8d --- /dev/null +++ b/betree/haura-benchmarks/src/bin/json-merge.rs @@ -0,0 +1,105 @@ +use std::{ + fs::File, + io::{self, Write}, + path::{Path, PathBuf}, +}; + +use serde_json::{Deserializer, Map, Value}; +use structopt::StructOpt; + +/// json-merge merges newline-delimited JSON documents from multiple sources: +/// +/// - read one document from primary stream, containing timestamp t +/// - read second document from primary stream, containing timestamp t' +/// - read all documents from secondary streams while their timestamps are between t (inclusive) and t' (exclusive) +/// - shallowly merge all accumulated documents into one +/// - output resulting document to stdout with timestamp t +/// - set t = t' +/// - repeat process from second step +#[derive(StructOpt)] +struct Opts { + primary: PathBuf, + secondary: Vec, + #[structopt(long)] + timestamp_key: String, +} + +fn open_stream(p: impl AsRef) -> io::Result>> { + let f = File::open(p.as_ref())?; + let iter = Deserializer::from_reader(f).into_iter::(); + Ok(iter) +} + +fn extract_timestamp(key: &str, v: &Map) -> Option { + v.get(key).and_then(|v| v.as_u64()) +} + +fn merge_into(mut dst: Map, src: Map) -> Map { + for (k, v) in src { + dst.insert(k.clone(), v.clone()); + } + + dst +} + +fn main() -> io::Result<()> { + let cfg = Opts::from_args(); + + let mut primary = open_stream(&cfg.primary)?; + let mut secondaries = cfg + .secondary + .iter() + .map(|path| open_stream(path).map(Iterator::peekable)) + .collect::>>()?; + + let stdout = io::stdout(); + let mut stdout = stdout.lock(); + + let mut lower = match primary.next() { + Some(Ok(Value::Object(lower))) => lower, + Some(Ok(_not_object)) => panic!("Value is not an object!"), + Some(Err(e)) => panic!("Couldn't read first object, {}", e), + None => return Ok(()), // primary is empty + }; + let mut acc = Vec::new(); + + while let Some(Ok(Value::Object(upper))) = primary.next() { + let t = extract_timestamp(&cfg.timestamp_key, &lower); + let t2 = extract_timestamp(&cfg.timestamp_key, &upper); + + acc.clear(); + + for secondary in &mut secondaries { + 'process_sec: while let Some(Ok(Value::Object(map))) = secondary.peek() { + let ts = extract_timestamp(&cfg.timestamp_key, map); + if ts < t { + // secondary is too early, skip this + let _ = secondary.next(); + continue 'process_sec; + } else if ts < t2 { + // between t and t2, accumulate + if let Some(Ok(Value::Object(map))) = secondary.next() { + acc.push(map); + } else { + unreachable!() + } + } else { + // too far ahead, go to next stream + break 'process_sec; + } + } + } + + let merged = acc.drain(..).fold(lower, |acc, mut x| { + x.remove(&cfg.timestamp_key); + merge_into(acc, x) + }); + + serde_json::to_writer(&mut stdout, &Value::Object(merged))?; + writeln!(&mut stdout)?; + + lower = upper; + } + + Ok(()) +} diff --git a/betree/haura-benchmarks/src/bin/sysinfo-log.rs b/betree/haura-benchmarks/src/bin/sysinfo-log.rs new file mode 100644 index 00000000..ab909a74 --- /dev/null +++ b/betree/haura-benchmarks/src/bin/sysinfo-log.rs @@ -0,0 +1,73 @@ +use std::{ + fs::File, + io::{self, BufWriter, Write}, + path::PathBuf, + thread, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use libmedium::{ + hwmon::Hwmons, + sensors::{temp::TempSensor, Sensor}, +}; +use serde_json::{Map, Number, Value}; +use structopt::StructOpt; + +#[derive(StructOpt)] +struct Opts { + #[structopt(long)] + output: PathBuf, + #[structopt(long)] + interval_ms: u64, +} + +fn gather() -> Map { + let mut map = Map::new(); + + if let Ok(hwmons) = Hwmons::parse() { + let mut hwmons_map = Map::new(); + + for hwmon in hwmons.into_iter() { + let mut hwmon_map = Map::new(); + for (_name, tempsensor) in hwmon.temps() { + if let Ok(temp) = tempsensor.read_input() { + let celsius = Number::from_f64(temp.as_degrees_celsius()) + .expect("Invalid temperature (NaN/infinity)"); + hwmon_map.insert(tempsensor.name(), Value::Number(celsius)); + } + } + + hwmons_map.insert( + format!("{}:{}", hwmon.index(), hwmon.name()), + Value::Object(hwmon_map), + ); + } + + map.insert(String::from("hwmon"), Value::Object(hwmons_map)); + } + + let epoch_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or(u128::MAX); + map.insert(String::from("epoch_ms"), Value::from(epoch_ms as u64)); + + map +} + +fn main() -> io::Result<()> { + let cfg = Opts::from_args(); + let file = File::create(cfg.output)?; + let mut output = BufWriter::new(file); + let interval = Duration::from_millis(cfg.interval_ms); + + loop { + let now = Instant::now(); + + serde_json::to_writer(&mut output, &gather())?; + writeln!(&mut output)?; + output.flush()?; + + thread::sleep(interval.saturating_sub(now.elapsed())); + } +} diff --git a/betree/haura-benchmarks/src/bufreader.rs b/betree/haura-benchmarks/src/bufreader.rs new file mode 100644 index 00000000..9b16f938 --- /dev/null +++ b/betree/haura-benchmarks/src/bufreader.rs @@ -0,0 +1,39 @@ +//! This is a small wrapper around the std BufReader, +//! which uses seek_relative when possible. +//! This prevents unnecessary buffer discards, +//! and massively improves performance of the initial parsing +//! of zip archive metadata. +use std::io::{self, BufReader, Read, Seek, SeekFrom}; + +pub struct BufReaderSeek { + b: BufReader, +} + +impl BufReaderSeek { + pub fn new(r: R) -> Self { + Self { + b: BufReader::new(r), + } + } +} + +impl Read for BufReaderSeek { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.b.read(buf) + } + + fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { + self.b.read_exact(buf) + } +} + +impl Seek for BufReaderSeek { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + if let SeekFrom::Current(offset) = pos { + self.b.seek_relative(offset)?; + self.b.stream_position() + } else { + self.b.seek(pos) + } + } +} diff --git a/betree/haura-benchmarks/src/checkpoints.rs b/betree/haura-benchmarks/src/checkpoints.rs new file mode 100644 index 00000000..5ef493b1 --- /dev/null +++ b/betree/haura-benchmarks/src/checkpoints.rs @@ -0,0 +1,53 @@ +///! This case implements a checkpoint like writing test in which multiple +///! objects are created on the preferred fastest speed and later migrated +///! downwards once they are no longer needed. +///! +///! A sync is performed after each object batch to ensure that data is safe +///! before continuing. +use betree_perf::*; +use betree_storage_stack::StoragePreference; +use rand::RngCore; +use std::{error::Error, io::Write}; +use std::io::BufWriter; + +pub fn run(mut client: Client) -> Result<(), Box> { + const N_OBJECTS: usize = 5; + const OBJECT_SIZE_MIB: [u64; N_OBJECTS] = [256, 256, 1, 384, 128]; + const N_GENERATIONS: usize = 70; + const MIN_WAIT_MS: u64 = 1500; + const WAIT_RAND_RANGE: u64 = 400; + println!("running checkpoints"); + + + let mut stats = BufWriter::new(std::fs::OpenOptions::new().create(true).write(true).open("checkpoints.csv")?); + stats.write_all(b"generation,size_mib,object_num,time_ms\n")?; + for gen in 0..N_GENERATIONS { + let start = std::time::Instant::now(); + let mut accumulated_size = 0; + for obj_id in 0..N_OBJECTS { + let key = format!("{gen}_{obj_id}"); + println!("Creating {key}"); + let (obj, _info) = client + .object_store + .open_or_create_object_with_pref(key.as_bytes(), StoragePreference::FASTEST)?; + // We definitely want to write on the fastest layer to minimize + // waiting inbetween computation. + let mut cursor = obj.cursor_with_pref(StoragePreference::FASTEST); + accumulated_size+= OBJECT_SIZE_MIB[obj_id]; + with_random_bytes( + &mut client.rng, + OBJECT_SIZE_MIB[obj_id] * 1024 * 1024, + 8 * 1024 * 1024, + |b| cursor.write_all(b), + )?; + } + stats.write_all(format!("{gen},{accumulated_size},{N_OBJECTS},{}", start.elapsed().as_millis()).as_bytes())?; + std::thread::sleep(std::time::Duration::from_millis( + client.rng.next_u64() % WAIT_RAND_RANGE + MIN_WAIT_MS, + )); + client.sync().expect("Failed to sync database"); + } + client.sync().expect("Failed to sync database"); + stats.flush()?; + Ok(()) +} diff --git a/betree/haura-benchmarks/src/filesystem.rs b/betree/haura-benchmarks/src/filesystem.rs new file mode 100644 index 00000000..92af4e22 --- /dev/null +++ b/betree/haura-benchmarks/src/filesystem.rs @@ -0,0 +1,159 @@ +///! +use betree_perf::*; +use betree_storage_stack::vdev::Block; +use betree_storage_stack::StoragePreference; +use rand::{ + distributions::{DistIter, Slice}, + thread_rng, Rng, +}; +use std::{error::Error, io::Write, ops::Range}; + +fn pref(foo: u8, size: Block, client: &Client) -> StoragePreference { + let space = client.database.read().free_space_tier(); + match foo { + 0 if Block(space[0].free.0 - size.0) > Block((space[0].total.0 as f64 * 0.2) as u64) => { + StoragePreference::FASTEST + } + 1 if Block(space[1].free.0 - size.0) > Block((space[1].total.0 as f64 * 0.2) as u64) => { + StoragePreference::FAST + } + 2 if Block(space[2].free.0 - size.0) > Block((space[2].total.0 as f64 * 0.2) as u64) => { + StoragePreference::SLOW + } + 3.. => panic!(), + _ => pref(foo + 1, size, client), + } +} + +// barely, seldom, often +const PROBS: [f64; 3] = [0.01, 0.2, 0.9]; + +// LANL size reference +const SIZES: [u64; 5] = [ + 64 * 1000, + 256 * 1000, + 1 * 1000 * 1000, + 4 * 1000 * 1000, + 1 * 1000 * 1000 * 1000, +]; +// Tuple describing the file distribution +const GROUPS_SPEC: [[usize; 5]; 3] = [ + [1022, 256, 1364, 1364, 24], + [164, 40, 220, 220, 4], + [12, 4, 16, 16, 2], +]; + +const TIERS: Range = 0..3; + +pub fn run(mut client: Client) -> Result<(), Box> { + println!("running filesystem"); + println!("initialize state"); + let mut groups = vec![]; + let mut counter: u64 = 1; + let start = std::time::Instant::now(); + for t_id in 0..3 { + groups.push(vec![]); + let objs = groups.last_mut().unwrap(); + for (count, size) in GROUPS_SPEC[t_id].iter().zip(SIZES.iter()) { + for _ in 0..*count { + let pref = pref( + client.rng.gen_range(TIERS), + Block::from_bytes(*size), + &client, + ); + let key = format!("key{counter}").into_bytes(); + let (obj, _info) = client + .object_store + .open_or_create_object_with_pref(&key, pref)?; + objs.push(key); + counter += 1; + let mut cursor = obj.cursor_with_pref(pref); + with_random_bytes(&mut client.rng, *size, 8 * 1024 * 1024, |b| { + cursor.write_all(b) + })?; + } + } + } + + println!("sync db"); + client.sync().expect("Failed to sync database"); + + println!("start conditioning"); + let mut buf = vec![0; 2 * 1024 * 1024 * 1024]; + let mut samplers: Vec> = groups + .iter() + .map(|ob| thread_rng().sample_iter(Slice::new(&ob).unwrap())) + .collect(); + while start.elapsed().as_secs() < 1200 { + // println!("Reading generation {run} of {RUNS}"); + for (id, prob) in PROBS.iter().enumerate() { + if client.rng.gen_bool(*prob) { + let obj = samplers[id].next().unwrap(); + let obj = client.object_store.open_object(obj)?.unwrap(); + obj.read_at(&mut buf, 0).map_err(|e| e.1)?; + } + } + } + // Allow for some cooldown and migration ending... + std::thread::sleep(std::time::Duration::from_secs(30)); + println!("sync db"); + client.sync().expect("Failed to sync database"); + // pick certain files which we know are in range, here we pick 3x64KB, 3x256KB, 3x1MB, 3x4MB, 1x1GB + // Read individual files multiple times to see the cache working? + const SELECTION: [usize; 5] = [3, 3, 3, 3, 1]; + println!("start measuring"); + let f = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open("filesystem_measurements.csv")?; + let mut w = std::io::BufWriter::new(f); + w.write_all(b"key,size,read_latency_ns,write_latency_ns,group\n")?; + + for (n, _) in GROUPS_SPEC.iter().enumerate() { + for (idx, sel_num) in SELECTION.iter().enumerate() { + let obj_num = GROUPS_SPEC[n][idx]; + let okstart = obj_key_start(n, idx); + let okend = okstart + obj_num; + for _ in 0..*sel_num { + client.database.read().drop_cache()?; + let obj_key = format!("key{}", client.rng.gen_range(okstart..=okend)); + let obj = client + .object_store + .open_object(obj_key.as_bytes())? + .expect("Known object could not be opened"); + let start = std::time::Instant::now(); + obj.read_at(&mut buf, 0).map_err(|e| e.1)?; + let read_time = start.elapsed(); + let size = SIZES[idx]; + let mut cursor = obj.cursor(); + let start = std::time::Instant::now(); + with_random_bytes(&mut client.rng, size, 8 * 1024 * 1024, |b| { + cursor.write_all(b) + })?; + let write_time = start.elapsed(); + w.write_all( + format!( + "{obj_key},{size},{},{},{n}\n", + read_time.as_nanos(), + write_time.as_nanos() + ) + .as_bytes(), + )?; + client.sync()?; + client.database.read().drop_cache()?; + std::thread::sleep(std::time::Duration::from_secs(20)); + } + } + } + w.flush()?; + Ok(()) +} + +fn obj_key_start(tier: usize, group: usize) -> usize { + let mut tier_offset = 0; + for idx in 0..tier { + tier_offset += GROUPS_SPEC[idx].iter().sum::(); + } + let group_offset = GROUPS_SPEC[tier].iter().take(group).sum::(); + tier_offset + group_offset +} diff --git a/betree/haura-benchmarks/src/filesystem_zip.rs b/betree/haura-benchmarks/src/filesystem_zip.rs new file mode 100644 index 00000000..d5286545 --- /dev/null +++ b/betree/haura-benchmarks/src/filesystem_zip.rs @@ -0,0 +1,154 @@ +///! +use betree_perf::*; +use betree_storage_stack::vdev::Block; +use betree_storage_stack::StoragePreference; +use rand::{ + distributions::{DistIter, Slice}, + seq::SliceRandom, + thread_rng, Rng, +}; +use std::{ + error::Error, + io::{Read, Write}, + ops::Range, + path::Path, +}; + +fn pref(foo: u8, size: Block, client: &Client) -> StoragePreference { + let space = client.database.read().free_space_tier(); + match foo { + 0 if Block(space[0].free.0 - size.0) > Block((space[0].total.0 as f64 * 0.2) as u64) => { + StoragePreference::FASTEST + } + 1 if Block(space[1].free.0 - size.0) > Block((space[1].total.0 as f64 * 0.2) as u64) => { + StoragePreference::FAST + } + 2 if Block(space[2].free.0 - size.0) > Block((space[2].total.0 as f64 * 0.2) as u64) => { + StoragePreference::SLOW + } + 3.. => panic!(), + _ => pref(foo + 1, size, client), + } +} + +// barely, seldom, often +const PROBS: [f64; 3] = [0.01, 0.2, 0.9]; + +const TIERS: Range = 0..3; + +const GROUPS: [f32; 3] = [0.8, 0.15, 0.05]; +const NUM_SAMPLE: usize = 50; + +pub fn run(mut client: Client, zip_path: impl AsRef) -> Result<(), Box> { + println!("running filesystem"); + println!("initialize state"); + + let file = std::fs::OpenOptions::new().read(true).open(zip_path)?; + let mut zip = zip::ZipArchive::new(file).unwrap(); + + // Create objects + let start = std::time::Instant::now(); + + // use expandable vector + let mut buf = Vec::new(); + let file_names = zip + .file_names() + .map(|n| n.to_string()) + .collect::>(); + let mut file_name_with_size = vec![]; + let file_num = file_names.len(); + for file in file_names.into_iter() { + // Read each file and insert randomly as new object into the object store + let mut zfile = zip.by_name(&file).unwrap(); + zfile.read_to_end(&mut buf)?; + let size = zfile.compressed_size(); + let pref = pref( + client.rng.gen_range(TIERS), + Block::from_bytes(size), + &client, + ); + let (obj, _) = client + .object_store + .open_or_create_object_with_pref(file.as_bytes(), pref)?; + obj.write_at_with_pref(&buf, 0, pref).map_err(|e| e.1)?; + file_name_with_size.push((file, size)); + buf.clear(); + } + + // Create groups + let mut groups: [Vec<(String, u64)>; 3] = [0; 3].map(|_| vec![]); + let mut distributed = 0usize; + file_name_with_size.shuffle(&mut client.rng); + for (id, part) in GROUPS.iter().enumerate() { + let num = (part * file_num as f32) as usize; + groups[id] = file_name_with_size[distributed..(distributed + num)].to_vec(); + distributed += num; + } + + println!("sync db"); + client.sync().expect("Failed to sync database"); + + println!("start conditioning"); + let mut buf = vec![0; 5 * 1024 * 1024 * 1024]; + let mut samplers: Vec> = groups + .iter() + .map(|ob| thread_rng().sample_iter(Slice::new(&ob).unwrap())) + .collect(); + while start.elapsed().as_secs() < 1200 { + // println!("Reading generation {run} of {RUNS}"); + for (id, prob) in PROBS.iter().enumerate() { + if client.rng.gen_bool(*prob) { + let obj = samplers[id].next().unwrap(); + let obj = client.object_store.open_object(obj.0.as_bytes())?.unwrap(); + obj.read_at(&mut buf, 0).map_err(|e| e.1)?; + } + } + } + // Allow for some cooldown and migration ending... + std::thread::sleep(std::time::Duration::from_secs(30)); + println!("sync db"); + client.sync().expect("Failed to sync database"); + // // pick certain files which we know are in range, here we pick 3x64KB, 3x256KB, 3x1MB, 3x4MB, 1x1GB + // // Read individual files multiple times to see the cache working? + // const SELECTION: [usize; 5] = [3, 3, 3, 3, 1]; + println!("start measuring"); + let f = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open("filesystem_zip_measurements.csv")?; + let mut w = std::io::BufWriter::new(f); + w.write_all(b"key,size,read_latency_ns,write_latency_ns,group\n")?; + + for (idx, sampler) in groups.iter().enumerate() { + for _ in 0..NUM_SAMPLE { + client.database.read().drop_cache()?; + let (obj_key, size) = sampler.choose(&mut client.rng).unwrap(); + let obj = client + .object_store + .open_object(obj_key.as_bytes())? + .expect("Known object could not be opened"); + let start = std::time::Instant::now(); + obj.read_at(&mut buf, 0).map_err(|e| e.1)?; + let read_time = start.elapsed(); + let mut cursor = obj.cursor(); + let start = std::time::Instant::now(); + with_random_bytes(&mut client.rng, *size, 8 * 1024 * 1024, |b| { + cursor.write_all(b) + })?; + let write_time = start.elapsed(); + w.write_all( + format!( + "{obj_key},{size},{},{},{idx}\n", + read_time.as_nanos(), + write_time.as_nanos() + ) + .as_bytes(), + )?; + client.sync()?; + client.database.read().drop_cache()?; + std::thread::sleep(std::time::Duration::from_secs(5)); + } + } + w.flush()?; + Ok(()) +} diff --git a/betree/haura-benchmarks/src/ingest.rs b/betree/haura-benchmarks/src/ingest.rs new file mode 100644 index 00000000..0531fa4e --- /dev/null +++ b/betree/haura-benchmarks/src/ingest.rs @@ -0,0 +1,28 @@ +use betree_perf::*; +use betree_storage_stack::StoragePreference; +use std::{ + error::Error, + fs::File, + io::{self, Seek, SeekFrom}, + path::Path, +}; + +pub fn run(client: &mut Client, path: impl AsRef) -> Result<(), Box> { + println!("running ingest::run"); + + let os = &client.object_store; + + let mut input = File::open(path.as_ref())?; + + let (obj, _info) = os.open_or_create_object_with_pref(b"obj", StoragePreference::FASTEST)?; + let mut cursor = obj.cursor(); + + io::copy(&mut input, &mut cursor)?; + + client.sync().expect("Failed to sync database"); + + cursor.seek(SeekFrom::Start(0))?; + io::copy(&mut cursor, &mut io::sink())?; + + Ok(()) +} diff --git a/betree/haura-benchmarks/src/lib.rs b/betree/haura-benchmarks/src/lib.rs new file mode 100644 index 00000000..4b8e9f24 --- /dev/null +++ b/betree/haura-benchmarks/src/lib.rs @@ -0,0 +1,154 @@ +use std::{ + env, + fs::File, + io::{self, BufWriter, Write}, + path::{Path, PathBuf}, + sync::Arc, + thread, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +use betree_storage_stack::{ + database::{self, AccessMode}, + env_logger::init_env_logger, + metrics, object, DatabaseConfiguration, StoragePreference, +}; +use figment::providers::Format; +use parking_lot::RwLock; +use procfs::process::Process; +use rand::{Rng, SeedableRng}; +use rand_xoshiro::Xoshiro256Plus; + +pub mod bufreader; + +pub type Database = database::Database; +pub type Dataset = database::Dataset; +pub type ObjectStore = object::ObjectStore; + +pub struct Control { + pub database: Arc>, +} + +impl Control { + pub fn with_custom_config(modify_cfg: impl Fn(&mut DatabaseConfiguration)) -> Self { + init_env_logger(); + + let conf_path = env::var("BETREE_CONFIG").expect("Didn't provide a BETREE_CONFIG"); + + let mut cfg: DatabaseConfiguration = figment::Figment::new() + .merge(DatabaseConfiguration::figment_default()) + .merge(figment::providers::Json::file(conf_path)) + .merge(DatabaseConfiguration::figment_env()) + .extract() + .expect("Failed to extract DatabaseConfiguration"); + + cfg.access_mode = AccessMode::AlwaysCreateNew; + + cfg.sync_interval_ms = Some(1000); + + cfg.metrics = Some(metrics::MetricsConfiguration { + enabled: true, + interval_ms: 500, + output_path: PathBuf::from("betree-metrics.jsonl"), + }); + + modify_cfg(&mut cfg); + + log::info!("using {:?}", cfg); + + let database = Database::build_threaded(cfg).expect("Failed to open database"); + + Control { database } + } + + pub fn new() -> Self { + Self::with_custom_config(|_| {}) + } + + pub fn client(&mut self, id: u32, task: &[u8]) -> Client { + let mut lock = self.database.write(); + + let os = lock + .open_named_object_store(task, StoragePreference::NONE) + .expect("Failed to create/open object store"); + Client { + database: self.database.clone(), + rng: Xoshiro256Plus::seed_from_u64(id as u64), + object_store: os, + } + } +} + +pub struct Client { + pub database: Arc>, + pub rng: Xoshiro256Plus, + pub object_store: ObjectStore, +} + +impl Client { + pub fn sync(&self) -> database::Result<()> { + self.database.write().sync() + } +} + +pub fn log_process_info(path: impl AsRef, interval_ms: u64) -> io::Result<()> { + let file = File::create(path)?; + let mut output = BufWriter::new(file); + let interval = Duration::from_millis(interval_ms); + + let ticks = procfs::ticks_per_second() as f64; + let page_size = procfs::page_size(); + + loop { + let now = Instant::now(); + + if let Ok(proc) = Process::myself() { + let stats = proc.stat().map_err(|e| io::Error::other(e))?; + let info = serde_json::json!({ + "vsize": stats.vsize, + "rss": stats.rss * page_size, + "utime": stats.utime as f64 / ticks, + "stime": stats.stime as f64 / ticks, + "cutime": stats.cutime as f64 / ticks, + "cstime": stats.cstime as f64 / ticks, + "minflt": stats.minflt, + "cminflt": stats.cminflt, + "majflt": stats.majflt, + "cmajflt": stats.cmajflt + }); + + serde_json::to_writer( + &mut output, + &serde_json::json!({ + "epoch_ms": SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or(u128::MAX) as u64, + "proc": info + }), + )?; + output.write_all(b"\n")?; + output.flush()?; + } + + thread::sleep(interval.saturating_sub(now.elapsed())); + } +} + +pub fn with_random_bytes( + mut rng: impl Rng, + mut n_bytes: u64, + buf_size: usize, + mut callback: impl FnMut(&[u8]) -> Result<(), E>, +) -> Result<(), E> { + let mut buf = vec![0; buf_size]; + while n_bytes > 0 { + rng.fill(&mut buf[..]); + if let Err(e) = callback(&buf[..buf_size.min(n_bytes as usize)]) { + return Err(e); + } + n_bytes = n_bytes.saturating_sub(buf_size as u64); + } + + Ok(()) +} diff --git a/betree/haura-benchmarks/src/main.rs b/betree/haura-benchmarks/src/main.rs new file mode 100644 index 00000000..2a4629d6 --- /dev/null +++ b/betree/haura-benchmarks/src/main.rs @@ -0,0 +1,181 @@ +use std::{error::Error, path::PathBuf, process, thread, time::Duration}; + +use betree_perf::Control; +use structopt::StructOpt; + +mod checkpoints; +mod filesystem; +mod filesystem_zip; +mod ingest; +mod rewrite; +mod scientific_evaluation; +mod switchover; +mod tiered1; +mod zip; + +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +#[derive(StructOpt)] +enum Mode { + Tiered1, + Checkpoints, + Filesystem, + FilesystemZip { + path: PathBuf, + }, + EvaluationRead { + #[structopt(default_value = "120")] + runtime: u64, + size: u64, + samples: u64, + min_size: u64, + max_size: u64, + }, + EvaluationRW { + #[structopt(default_value = "120")] + runtime: u64, + size: u64, + samples: u64, + min_size: u64, + max_size: u64, + #[structopt(default_value = "0.5")] + ratio: f64, + }, + Zip { + n_clients: u32, + runs_per_client: u32, + files_per_run: u32, + path: PathBuf, + start_of_eocr: u64, + }, + Ingest { + path: PathBuf, + }, + Switchover { + part_count: u64, + part_size: u64, + }, + Rewrite { + object_size: u64, + rewrite_count: u64, + }, +} + +fn run_all(mode: Mode) -> Result<(), Box> { + thread::spawn(|| betree_perf::log_process_info("proc.jsonl", 250)); + + let root = std::env::var("ROOT").expect("Didn't provide a repository ROOT"); + let mut sysinfo = process::Command::new(format!("{root}/target/release/sysinfo-log")) + .args(&["--output", "sysinfo.jsonl", "--interval-ms", "250"]) + .spawn()?; + + let mut control = Control::new(); + + match mode { + Mode::Tiered1 => { + let client = control.client(0, b"tiered1"); + tiered1::run(client)?; + control.database.write().sync()?; + } + Mode::Checkpoints => { + let client = control.client(0, b"checkpoints"); + checkpoints::run(client)?; + control.database.write().sync()?; + } + Mode::Filesystem => { + let client = control.client(0, b"filesystem"); + filesystem::run(client)?; + control.database.write().sync()?; + } + Mode::FilesystemZip { path } => { + let client = control.client(0, b"filesystem_zip"); + filesystem_zip::run(client, path)?; + control.database.write().sync()?; + } + Mode::EvaluationRead { + runtime, + size, + samples, + min_size, + max_size, + } => { + let client = control.client(0, b"scientific_evaluation"); + let config = scientific_evaluation::EvaluationConfig { + runtime, + size, + samples, + min_size, + max_size, + }; + scientific_evaluation::run_read_write(client, config, 1.0, "read")?; + control.database.write().sync()?; + } + Mode::EvaluationRW { + runtime, + size, + samples, + min_size, + max_size, + ratio, + } => { + let client = control.client(0, b"scientific_evaluation"); + let config = scientific_evaluation::EvaluationConfig { + runtime, + size, + samples, + min_size, + max_size, + }; + scientific_evaluation::run_read_write(client, config, ratio.clamp(0.0, 1.0), "rw")?; + control.database.write().sync()?; + } + Mode::Zip { + path, + start_of_eocr, + n_clients, + runs_per_client, + files_per_run, + } => { + let mut client = control.client(0, b"zip"); + + zip::prepare(&mut client, path, start_of_eocr)?; + control.database.write().sync()?; + + zip::read(&mut client, n_clients, runs_per_client, files_per_run)?; + } + Mode::Ingest { path } => { + let mut client = control.client(0, b"ingest"); + ingest::run(&mut client, path)?; + } + Mode::Switchover { + part_count, + part_size, + } => { + let mut client = control.client(0, b"switchover"); + switchover::run(&mut client, part_count, part_size)?; + } + Mode::Rewrite { + object_size, + rewrite_count, + } => { + let mut client = control.client(0, b"rewrite"); + rewrite::run(&mut client, object_size, rewrite_count)?; + } + } + + thread::sleep(Duration::from_millis(2000)); + + sysinfo.kill()?; + sysinfo.wait()?; + + Ok(()) +} + +fn main() { + let mode = Mode::from_args(); + if let Err(e) = run_all(mode) { + eprintln!("error: {}", e); + process::exit(1); + } +} diff --git a/betree/haura-benchmarks/src/rewrite.rs b/betree/haura-benchmarks/src/rewrite.rs new file mode 100644 index 00000000..6c7da2db --- /dev/null +++ b/betree/haura-benchmarks/src/rewrite.rs @@ -0,0 +1,35 @@ +use betree_perf::*; +use betree_storage_stack::StoragePreference; +use std::{ + error::Error, + io::{Seek, SeekFrom, Write}, +}; + +pub fn run( + client: &mut Client, + object_size: u64, + rewrite_count: u64, +) -> Result<(), Box> { + println!("running rewrite::run"); + + // SLOWEST will fail in our config, so it serves as an early-failure mechanism for unset prefs + let (obj, _info) = client + .object_store + .open_or_create_object_with_pref(b"obj", StoragePreference::FASTEST)?; + let mut cursor = obj.cursor(); + + for _rewrite in 0..rewrite_count { + cursor.seek(SeekFrom::Start(0))?; + + with_random_bytes( + &mut client.rng, + object_size, + object_size.min(128 * 1024) as usize, + |b| cursor.write_all(b), + )?; + + client.sync().expect("Failed to sync database"); + } + + Ok(()) +} diff --git a/betree/haura-benchmarks/src/scientific_evaluation.rs b/betree/haura-benchmarks/src/scientific_evaluation.rs new file mode 100644 index 00000000..782ac2e7 --- /dev/null +++ b/betree/haura-benchmarks/src/scientific_evaluation.rs @@ -0,0 +1,92 @@ +///! This file implements a scientific workflow style writing first serial data onto a storage layer and then reading this data from storage in a somewhat +///! random but repeating pattern. +use betree_perf::*; +use betree_storage_stack::StoragePreference; +use rand::Rng; +use std::{ + error::Error, + io::{Seek, Write}, +}; + +const OBJ_NAME: &[u8] = b"important_research"; + +pub struct EvaluationConfig { + pub runtime: u64, + pub size: u64, + pub samples: u64, + pub min_size: u64, + pub max_size: u64, +} + +fn gen_positions(client: &mut Client, config: &EvaluationConfig) -> Vec<(u64, u64)> { + let mut positions = vec![]; + for _ in 0..config.samples { + let start = client.rng.gen_range(0..config.size); + let length = client.rng.gen_range(config.min_size..config.max_size); + positions.push((start, length.clamp(0, config.size.saturating_sub(start)))); + } + positions +} + +fn prepare_store(client: &mut Client, config: &EvaluationConfig) -> Result<(), Box> { + let start = std::time::Instant::now(); + let (obj, _info) = client + .object_store + .open_or_create_object_with_pref(b"important_research", StoragePreference::SLOW)?; + let mut cursor = obj.cursor_with_pref(StoragePreference::SLOW); + + with_random_bytes(&mut client.rng, config.size, 8 * 1024 * 1024, |b| { + cursor.write_all(b) + })?; + println!("Initial write took {}s", start.elapsed().as_secs()); + client.sync().expect("Failed to sync database"); + Ok(()) +} + +pub fn run_read_write( + mut client: Client, + config: EvaluationConfig, + rw: f64, + name: &str, +) -> Result<(), Box> { + println!("running scientific_evaluation"); + // Generate positions to read + let positions = gen_positions(&mut client, &config); + prepare_store(&mut client, &config)?; + + let (obj, _info) = client + .object_store + .open_object_with_info(OBJ_NAME)? + .expect("Object was just created, but can't be opened!"); + let mut cursor = obj.cursor(); + + let start = std::time::Instant::now(); + let mut buf = vec![0; config.max_size as usize]; + let f = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(format!("evaluation_{name}.csv"))?; + let mut w = std::io::BufWriter::new(f); + w.write_all(b"offset,size,latency_ns,op\n")?; + for (pos, len) in positions.iter().cycle() { + // Read data as may be done in some evaluation where only parts of a + // database file are read in. + if client.rng.gen_bool(rw) { + let t = std::time::Instant::now(); + obj.read_at(&mut buf[..*len as usize], *pos).unwrap(); + w.write_fmt(format_args!("{pos},{len},{},r\n", t.elapsed().as_nanos()))?; + } else { + cursor.seek(std::io::SeekFrom::Start(*pos))?; + let t = std::time::Instant::now(); + with_random_bytes(&mut client.rng, *len, 8 * 1024 * 1024, |b| { + cursor.write_all(b) + })?; + w.write_fmt(format_args!("{pos},{len},{},w\n", t.elapsed().as_nanos()))?; + } + if start.elapsed().as_secs() >= config.runtime { + break; + } + } + w.flush()?; + Ok(()) +} diff --git a/betree/haura-benchmarks/src/switchover.rs b/betree/haura-benchmarks/src/switchover.rs new file mode 100644 index 00000000..ed9f7543 --- /dev/null +++ b/betree/haura-benchmarks/src/switchover.rs @@ -0,0 +1,41 @@ +use betree_perf::*; +use betree_storage_stack::StoragePreference; +use std::{ + error::Error, + io::{self, Seek, SeekFrom, Write}, +}; + +pub fn run(client: &mut Client, part_count: u64, part_size: u64) -> Result<(), Box> { + println!("running switchover::run"); + + // SLOWEST will fail in our config, so it serves as an early-failure mechanism for unset prefs + let (obj, _info) = client + .object_store + .open_or_create_object_with_pref(b"obj", StoragePreference::SLOWEST)?; + let mut cursor = obj.cursor(); + + for oddness in [0, 1] { + cursor.seek(SeekFrom::Start(0))?; + + for part_idx in 0..part_count { + cursor.set_storage_preference(if part_idx % 2 == oddness { + StoragePreference::FASTEST + } else { + StoragePreference::FAST + }); + with_random_bytes( + &mut client.rng, + part_size, + part_size.min(128 * 1024) as usize, + |b| cursor.write_all(b), + )?; + } + + client.sync().expect("Failed to sync database"); + + cursor.seek(SeekFrom::Start(0))?; + io::copy(&mut cursor, &mut io::sink())?; + } + + Ok(()) +} diff --git a/betree/haura-benchmarks/src/tiered1.rs b/betree/haura-benchmarks/src/tiered1.rs new file mode 100644 index 00000000..663df55e --- /dev/null +++ b/betree/haura-benchmarks/src/tiered1.rs @@ -0,0 +1,56 @@ +use betree_perf::*; +use betree_storage_stack::StoragePreference; +use std::{error::Error, io::Write}; + +pub fn run(mut client: Client) -> Result<(), Box> { + const N_OBJECTS: u64 = 1; + const OBJECT_SIZE: u64 = 5 * 1024 * 1024 * 1024; + println!("running tiered1"); + + let os = &client.object_store; + + let mut objects = Vec::new(); + + for obj_idx in 0..N_OBJECTS { + let name = format!("foo-{}", obj_idx); + println!("using {}", name); + objects.push(name.clone()); + + let (obj, _info) = + os.open_or_create_object_with_pref(name.as_bytes(), StoragePreference::FASTEST)?; + let mut cursor = obj.cursor_with_pref(StoragePreference::FAST); + + with_random_bytes(&mut client.rng, OBJECT_SIZE / 2, 8 * 1024 * 1024, |b| { + cursor.write_all(b) + })?; + + cursor.set_storage_preference(StoragePreference::FASTEST); + + with_random_bytes(&mut client.rng, OBJECT_SIZE / 2, 8 * 1024 * 1024, |b| { + cursor.write_all(b) + })?; + } + + client.sync().expect("Failed to sync database"); + + for obj_name in objects { + let (obj, info) = os + .open_object_with_info(obj_name.as_bytes())? + .expect("Object was just created, but can't be opened!"); + + assert_eq!(info.size, OBJECT_SIZE); + let size = obj + .read_all_chunks()? + .map(|chunk| { + if let Ok((_k, v)) = chunk { + v.len() as u64 + } else { + 0 + } + }) + .sum::(); + assert_eq!(info.size, size); + } + + Ok(()) +} diff --git a/betree/haura-benchmarks/src/zip.rs b/betree/haura-benchmarks/src/zip.rs new file mode 100644 index 00000000..183bbcb7 --- /dev/null +++ b/betree/haura-benchmarks/src/zip.rs @@ -0,0 +1,103 @@ +use betree_perf::*; +use betree_storage_stack::StoragePreference; +use std::{ + error::Error, + fs::File, + io::{self, Read, Write}, + path::Path, +}; + +use rand::{seq::SliceRandom, SeedableRng}; +use rand_xoshiro::Xoshiro256Plus; +use zip::*; + +pub fn prepare( + client: &mut Client, + path: impl AsRef, + start_of_eocr: u64, +) -> Result<(), Box> { + println!("running zip::prepare"); + + let os = &client.object_store; + + let mut zip_archive = File::open(path.as_ref())?; + let zip_archive_len = zip_archive.metadata()?.len(); + assert!(start_of_eocr < zip_archive_len); + + let (zip, _info) = + os.open_or_create_object_with_pref(b"archive.zip", StoragePreference::FAST)?; + let mut cursor = zip.cursor(); + + let mut buf = vec![0; 128 * 1024]; + let mut idx = 0; + while idx < zip_archive_len { + let max_bytes = buf.len().min(zip_archive_len as usize - idx as usize); + let n_read = zip_archive.read(&mut buf[..max_bytes])?; + + // let is_metadata = idx < 1024 * 1024 || idx + n_read as u64 + 1024 * 1024 >= start_of_eocr; + let is_metadata = idx + n_read as u64 >= start_of_eocr; + cursor.set_storage_preference(if is_metadata { + StoragePreference::FASTEST + } else { + StoragePreference::FAST + }); + + cursor.write_all(&buf[..n_read])?; + idx += n_read as u64; + } + + client.sync().expect("Failed to sync database"); + + Ok(()) +} + +pub fn read( + client: &mut Client, + n_clients: u32, + runs_per_client: u32, + files_per_run: u32, +) -> Result<(), Box> { + println!("running zip::read"); + + let zip = client + .object_store + .open_object(b"archive.zip")? + .expect("archive.zip doesn't exist"); + + let mut file_names = ZipArchive::new(bufreader::BufReaderSeek::new(zip.cursor()))? + .file_names() + .map(String::from) + .collect::>(); + + // file_names gives a non-deterministic order, sort for reproducible file selection + file_names.sort(); + + crossbeam::scope(|s| { + for client_id in 0..n_clients { + let mut rng = Xoshiro256Plus::seed_from_u64(client_id as u64); + let obj = zip.clone(); + let file_names = &file_names[..]; + + s.spawn(move |_| { + for _ in 0..runs_per_client { + let cursor = obj.cursor(); + let input = bufreader::BufReaderSeek::new(cursor); + let mut archive = ZipArchive::new(input).expect("Couldn't open zip archive"); + + for _ in 0..files_per_run { + let file_name = &file_names.choose(&mut rng).expect("Empty file name list"); + + let mut file = archive + .by_name(file_name) + .expect("Couldn't access file by name"); + + io::copy(&mut file, &mut io::sink()).expect("Couldn't read file"); + } + } + }); + } + }) + .expect("Child thread has panicked"); + + Ok(()) +}