Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[testing-on-gke part 1.4] Update parse_tests scripts based on workload config parsers #2271

Merged
merged 10 commits into from
Aug 21, 2024
177 changes: 125 additions & 52 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import json, os, pprint, subprocess
import sys
import dlio_workload

sys.path.append("../")
from utils.utils import get_memory, get_cpu, standard_timestamp, is_mash_installed

LOCAL_LOGS_LOCATION = "../../bin/dlio-logs"
_LOCAL_LOGS_LOCATION = "../../bin/dlio-logs"

record = {
"pod_name": "",
Expand All @@ -38,45 +40,78 @@
"lowest_memory": 0,
"highest_cpu": 0.0,
"lowest_cpu": 0.0,
"gcsfuse_mount_options": "",
}

if __name__ == "__main__":
bucketNames = [
"gke-dlio-unet3d-100kb-500k",
"gke-dlio-unet3d-150mb-5k",
"gke-dlio-unet3d-3mb-100k",
"gke-dlio-unet3d-500kb-1m",
]

try:
os.makedirs(LOCAL_LOGS_LOCATION)
except FileExistsError:
pass

for bucketName in bucketNames:
print(f"Download DLIO logs from the bucket {bucketName}...")
def downloadDlioOutputs(dlioWorkloads):
for dlioWorkload in dlioWorkloads:
print(f"Downloading DLIO logs from the bucket {dlioWorkload.bucket}...")
result = subprocess.run(
[
"gsutil",
"-m",
"gcloud",
"-q", # ignore prompts
"storage",
"cp",
"-r",
f"gs://{bucketName}/logs",
LOCAL_LOGS_LOCATION,
"--no-user-output-enabled", # do not print names of files being copied
f"gs://{dlioWorkload.bucket}/logs",
_LOCAL_LOGS_LOCATION,
],
capture_output=False,
text=True,
)
if result.returncode < 0:
print(f"failed to fetch DLIO logs, error: {result.stderr}")


if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="DLIO Unet3d test output parser",
description=(
"This program takes in a json workload configuration file and parses"
" it for valid DLIO workloads and the locations of their test outputs"
" on GCS. It downloads each such output object locally to"
" {_LOCAL_LOGS_LOCATION} and parses them for DLIO test runs, and then"
" dumps their output metrics into a CSV report file."
),
)
parser.add_argument(
"--workload-config",
help=(
"A json configuration file to define workloads that were run to"
" generate the outputs that should be parsed."
),
required=True,
)
parser.add_argument(
"--project-number",
help=(
"project-number (e.g. 93817472919) is needed to fetch the cpu/memory"
" utilization data from GCP."
),
required=True,
)
args = parser.parse_args()

try:
os.makedirs(_LOCAL_LOGS_LOCATION)
except FileExistsError:
pass

dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads(
args.workload_config
)
downloadDlioOutputs(dlioWorkloads)

"""
"{num_files_train}-{mean_file_size}-{batch_size}":
"mean_file_size": str
"num_files_train": str
"batch_size": str
"records":
"local-ssd": [record1, record2, record3, record4]
"gcsfuse-generic": [record1, record2, record3, record4]
"gcsfuse-file-cache": [record1, record2, record3, record4]
"gcsfuse-no-file-cache": [record1, record2, record3, record4]
"""
Expand All @@ -85,15 +120,31 @@
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

for root, _, files in os.walk(LOCAL_LOGS_LOCATION):
for root, _, files in os.walk(_LOCAL_LOGS_LOCATION):
if files:
print(f"Parsing directory {root} ...")
per_epoch_stats_file = root + "/per_epoch_stats.json"
summary_file = root + "/summary.json"

gcsfuse_mount_options = ""
gcsfuse_mount_options_file = root + "/gcsfuse_mount_options"
if os.path.isfile(gcsfuse_mount_options_file):
with open(gcsfuse_mount_options_file) as f:
gcsfuse_mount_options = f.read().strip()

with open(per_epoch_stats_file, "r") as f:
per_epoch_stats_data = json.load(f)
try:
per_epoch_stats_data = json.load(f)
except:
print(f"failed to json-parse {per_epoch_stats_file}")
continue

with open(summary_file, "r") as f:
summary_data = json.load(f)
try:
summary_data = json.load(f)
except:
print(f"failed to json-parse {summary_file}")
continue

for i in range(summary_data["epochs"]):
test_name = summary_data["hostname"]
Expand All @@ -107,6 +158,7 @@
"batch_size": part_list[4],
"records": {
"local-ssd": [],
"gcsfuse-generic": [],
"gcsfuse-file-cache": [],
"gcsfuse-no-file-cache": [],
},
Expand Down Expand Up @@ -134,11 +186,20 @@
r["end"] = standard_timestamp(per_epoch_stats_data[str(i + 1)]["end"])
if r["scenario"] != "local-ssd" and mash_installed:
r["lowest_memory"], r["highest_memory"] = get_memory(
r["pod_name"], r["start"], r["end"]
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu(
r["pod_name"], r["start"], r["end"]
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
pass

r["gcsfuse_mount_options"] = gcsfuse_mount_options

pprint.pprint(r)

Expand All @@ -147,28 +208,23 @@

output[key]["records"][r["scenario"]][i] = r

output_order = [
"500000-102400-800",
"500000-102400-128",
"1000000-512000-800",
"1000000-512000-128",
"100000-3145728-200",
"5000-157286400-4",
scenario_order = [
"local-ssd",
"gcsfuse-generic",
"gcsfuse-no-file-cache",
"gcsfuse-file-cache",
]
scenario_order = ["local-ssd", "gcsfuse-no-file-cache", "gcsfuse-file-cache"]

output_file = open("./output.csv", "a")
output_file.write(
"File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration"
" (s),GPU Utilization (%),Throughput (sample/s),Throughput"
" (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory (MB),GCSFuse"
" Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse Highest CPU"
" (core),Pod,Start,End\n"
" (core),Pod,Start,End,GcsfuseMountOptions\n"
)

for key in output_order:
if key not in output:
continue
for key in output:
record_set = output[key]
total_size = int(
int(record_set["mean_file_size"])
Expand All @@ -177,21 +233,38 @@
)

for scenario in scenario_order:
for i in range(len(record_set["records"]["local-ssd"])):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']}\n"
)
if scenario not in record_set["records"]:
print(f"{scenario} not in output so skipping")
continue
if "local-ssd" in record_set["records"] and (
len(record_set["records"]["local-ssd"])
== len(record_set["records"][scenario])
):
for i in range(len(record_set["records"]["local-ssd"])):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\"\n"
)
gargnitingoogle marked this conversation as resolved.
Show resolved Hide resolved
else:
for i in range(len(record_set["records"][scenario])):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = "NA"
output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\"\n"
)

output_file.close()
Loading
Loading