From 1747bfaea4fa74c1f226819a068a9bc0f6b78161 Mon Sep 17 00:00:00 2001 From: Stan Brubaker <120737309+stanbrub@users.noreply.github.com> Date: Mon, 13 Nov 2023 17:28:20 -0700 Subject: [PATCH] Support gzipped g cloud benchmark bucket files (#218) --- .github/scripts/fetch-results-local.sh | 27 ++++++++-- .github/workflows/remote-benchmarks.yml | 13 +---- .../run/profile/benchmark_tables.dh.py | 53 +++++++++++++++---- .../run/profile/parallel_download.py | 32 +++++++++++ 4 files changed, 101 insertions(+), 24 deletions(-) mode change 100644 => 100755 .github/scripts/fetch-results-local.sh create mode 100644 src/main/resources/io/deephaven/benchmark/run/profile/parallel_download.py diff --git a/.github/scripts/fetch-results-local.sh b/.github/scripts/fetch-results-local.sh old mode 100644 new mode 100755 index 2ca75fbd..bf94dba4 --- a/.github/scripts/fetch-results-local.sh +++ b/.github/scripts/fetch-results-local.sh @@ -4,7 +4,8 @@ set -o errexit set -o pipefail set -o nounset -# Fetches Benchmark results and logs from the remote test server +# Fetches Benchmark results and logs from the remote test server and +# compresses the runs before upload HOST=$1 USER=$2 @@ -12,10 +13,30 @@ RUN_TYPE=$3 RUN_DIR=/root/run if [[ $# != 3 ]]; then - echo "$0: Missing host, user, or run type arguments" - exit 1 + echo "$0: Missing host, user, or run type arguments" + exit 1 fi +# Pull results from the benchmark server scp -r ${USER}@${HOST}:${RUN_DIR}/results . scp -r ${USER}@${HOST}:${RUN_DIR}/logs . +rm -rf ${RUN_TYPE} mv results/ ${RUN_TYPE}/ + +# For now remove any unwanted summaries before uploading to GCloud +rm -f ${RUN_TYPE}/*.csv + +# Rename the svg summary table according to run type. Discard the rest +TMP_SVG_DIR=${RUN_TYPE}/tmp-svg +mkdir -p ${TMP_SVG_DIR} +mv ${RUN_TYPE}/*.svg ${TMP_SVG_DIR} +mv ${TMP_SVG_DIR}/${RUN_TYPE}-benchmark-summary.svg ${RUN_TYPE}/benchmark-summary.svg +rm -rf ${TMP_SVG_DIR} + +# Compress CSV and Test Logs +for runId in `find ${RUN_TYPE}/ -name "run-*"` +do + (cd ${runId}; gzip *.csv) + (cd ${runId}/test-logs; tar -zcvf test-logs.tgz *; mv test-logs.tgz ../) + rm -rf ${runId}/test-logs/ +done diff --git a/.github/workflows/remote-benchmarks.yml b/.github/workflows/remote-benchmarks.yml index c3570777..ed4919a6 100644 --- a/.github/workflows/remote-benchmarks.yml +++ b/.github/workflows/remote-benchmarks.yml @@ -63,20 +63,9 @@ jobs: run: | ${SD}/run-ssh-local.sh ${HOST} ${USER} ${SD} run-benchmarks-remote ${RUN_TYPE} - - name: Fetch Benchmark Results + - name: Fetch Benchmark Results and Prepare for Upload run: | ${SD}/fetch-results-local.sh ${HOST} ${USER} ${RUN_TYPE} - - - name: Manage Summary Results - run: | - # For now remove any unwanted summaries before uploading to GCloud - rm -f ${RUN_TYPE}/*.csv - # Rename the svg summary table according to run type. Discard the rest - TMP_SVG_DIR=${RUN_TYPE}/tmp-svg - mkdir -p ${TMP_SVG_DIR} - mv ${RUN_TYPE}/*.svg ${TMP_SVG_DIR} - mv ${TMP_SVG_DIR}/${RUN_TYPE}-benchmark-summary.svg ${RUN_TYPE}/benchmark-summary.svg - rm -rf ${TMP_SVG_DIR} - name: Authorize GCloud Credentials uses: google-github-actions/auth@v1 diff --git a/src/main/resources/io/deephaven/benchmark/run/profile/benchmark_tables.dh.py b/src/main/resources/io/deephaven/benchmark/run/profile/benchmark_tables.dh.py index 71ade3f4..ad98f2bf 100644 --- a/src/main/resources/io/deephaven/benchmark/run/profile/benchmark_tables.dh.py +++ b/src/main/resources/io/deephaven/benchmark/run/profile/benchmark_tables.dh.py @@ -7,7 +7,7 @@ import os, re, glob import deephaven.dtypes as dht from deephaven import read_csv, merge, agg, empty_table -from urllib.request import urlopen +from urllib.request import urlopen, urlretrieve # Schema for benchmstk-results.csv s_results = {'benchmark_name':dht.string, 'origin':dht.string,'timestamp':dht.long,'test_duration':dht.double, @@ -39,21 +39,56 @@ def get_run_ids(storage_uri, category, max_runs): else: return get_local_run_ids(storage_uri, category, max_runs) +# Cache an HTTP url into a local directory and return the local path +def cache_remote_csv(uri): + try: + out_path = re.sub('^http.*/deephaven-benchmark/', '/data/deephaven-benchmark/', uri) + os.makedirs(os.path.dirname(out_path), mode=0o777, exist_ok=True) + except Exception as ex: + print('Error downloading file:', out_path, ':', ex) + return uri + try: + out_path_gz = out_path + '.gz' + if os.path.exists(out_path_gz): return out_path_gz + urlretrieve(uri + '.gz', out_path_gz) + print('Cache', uri + '.gz') + return out_path_gz + except Exception: + try: + if os.path.exists(out_path): return out_path + urlretrieve(uri, out_path) + print('Cache', uri) + return out_path + except Exception as ex: + print('Error caching file:', out_path, ':', ex) + return uri + +# Read csv into a table (Currently, pandas is used for gzipped csv) +def dh_read_csv(uri, schema=None): + uri = uri.replace('file:///','/') + uri = cache_remote_csv(uri) if uri.startswith('http') else uri + try: + tbl = read_csv(uri + '.gz', schema) if schema else read_csv(uri + '.gz') + print('Load ' + uri + '.gz') + except Exception: + tbl = read_csv(uri, schema) if schema else read_csv(uri) + print('Load ' + uri) + return tbl + # Merge together benchmark runs from the GCloud bucket for the same csv (e.g. benchmark_results.csv) def merge_run_tables(parent_uri, run_ids, category, csv_file_name, schema = None): - merged_table = None + tables = [] for run_id in run_ids: table_uri = parent_uri + '/' + category + '/run-' + run_id + '/' + csv_file_name - print("Getting " + table_uri) - table_csv = read_csv(table_uri, schema) if schema else read_csv(table_uri) - table_csv = table_csv.update(['run_id = "' + run_id + '"']) - merged_table = merge([merged_table, table_csv]) if merged_table else table_csv - return merged_table + table_csv = dh_read_csv(table_uri, schema) + table_csv = table_csv.update_view(['run_id = "' + run_id + '"']) + tables.append(table_csv) + return merge(tables) # Load standard tables from GCloud or local storage according to category # If this script is run from exec(), accept the benchmark_category_arg default_storage_uri = 'https://storage.googleapis.com/deephaven-benchmark' -default_category = 'nightly' +default_category = 'ZTEST' default_max_runs = 5 default_history_runs = 5 @@ -140,7 +175,7 @@ def truncate(text, size): bench_results_change = bench_results_diff.sort(['benchmark_name', 'origin', 'deephaven_version', 'timestamp']) bench_results_change = bench_results_change.update_by(ops=[op_group, op_version], by=['benchmark_name', 'origin']) -bench_results_change = bench_results_change.update( +bench_results_change = bench_results_change.update_view( ['op_rate_variability=(float)rstd(op_group_rates)', 'op_rate_change=(float)rchange(op_group_rates)'] ) bench_results_change = bench_results_change.view( diff --git a/src/main/resources/io/deephaven/benchmark/run/profile/parallel_download.py b/src/main/resources/io/deephaven/benchmark/run/profile/parallel_download.py new file mode 100644 index 00000000..d78d4ec5 --- /dev/null +++ b/src/main/resources/io/deephaven/benchmark/run/profile/parallel_download.py @@ -0,0 +1,32 @@ +import asyncio, re, os, traceback +from urllib.request import urlretrieve + +def background(f): + def wrapped(*args, **kwargs): + return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs) + return wrapped + +@background +def download(url): + try: + out_path = re.sub('^http.*/deephaven-benchmark/', 'data/deephaven-benchmark/', url) + os.makedirs(os.path.dirname(out_path), mode=0o777, exist_ok=True) + except Exception: + print('Error downloading file:', download, ':', traceback.format_exc()) + return + try: + urlretrieve(url + '.gz', out_path + '.gz') + print('Got', out_path + '.gz') + except Exception: + try: + urlretrieve(url, out_path) + print('Got', out_path) + except Exception: + print('Error downloading file:', out_path, ':', traceback.format_exc()) + +urls = [ + ${downloadUrls} +] + +for url in urls: + download(url)