-
Notifications
You must be signed in to change notification settings - Fork 245
/
Copy pathdeploy_and_benchmark.py
424 lines (357 loc) · 18.6 KB
/
deploy_and_benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import argparse
import copy
import os
import re
import shutil
import subprocess
import sys
import yaml
from benchmark import run_benchmark
def read_yaml(file_path):
try:
with open(file_path, "r") as file:
return yaml.safe_load(file)
except Exception as e:
print(f"Error reading YAML file: {e}")
return None
def construct_deploy_config(deploy_config, target_node, batch_param_value=None, test_mode="oob"):
"""Construct a new deploy config based on the target node number and optional batch parameter value.
Args:
deploy_config: Original deploy config dictionary
target_node: Target node number to match in the node array
batch_param_value: Optional specific batch parameter value to use
test_mode: Test mode, either 'oob' or 'tune'
Returns:
A new deploy config with single values for node and instance_num
"""
# Deep copy the original config to avoid modifying it
new_config = copy.deepcopy(deploy_config)
# Get the node array and validate
nodes = deploy_config.get("node")
if not isinstance(nodes, list):
raise ValueError("deploy_config['node'] must be an array")
# Find the index of the target node
try:
node_index = nodes.index(target_node)
except ValueError:
raise ValueError(f"Target node {target_node} not found in node array {nodes}")
# Set the single node value
new_config["node"] = target_node
# First determine which llm replicaCount to use based on teirerank.enabled
services = new_config.get("services", {})
teirerank_enabled = services.get("teirerank", {}).get("enabled", True)
# Process each service's configuration
for service_name, service_config in services.items():
# Handle replicaCount
if "replicaCount" in service_config:
if service_name == "llm" and isinstance(service_config["replicaCount"], dict):
replica_counts = service_config["replicaCount"]
service_config["replicaCount"] = (
replica_counts["with_teirerank"] if teirerank_enabled else replica_counts["without_teirerank"]
)
if isinstance(service_config["replicaCount"], list):
if len(service_config["replicaCount"]) < len(nodes):
raise ValueError(
f"replicaCount array length ({len(service_config['replicaCount'])}) for service {service_name} "
f"smaller than node array length ({len(nodes)})"
)
service_config["replicaCount"] = service_config["replicaCount"][node_index]
# Handle resources based on test_mode
if "resources" in service_config:
resources = service_config["resources"]
if test_mode == "tune" or resources.get("enabled", False):
# Keep resource configuration but remove enabled field
resources.pop("enabled", None)
else:
# Remove resource configuration in OOB mode when disabled
service_config.pop("resources")
# Handle model parameters for LLM service
if service_name == "llm" and "model_params" in service_config:
model_params = service_config["model_params"]
engine = service_config.get("engine", "tgi")
# Get engine-specific parameters
engine_params = model_params.get(engine, {})
# Handle batch parameters
if "batch_params" in engine_params:
batch_params = engine_params["batch_params"]
if test_mode == "tune" or batch_params.get("enabled", False):
# Keep batch parameters configuration but remove enabled field
batch_params.pop("enabled", None)
# Update batch parameter value if specified
if batch_param_value is not None:
if engine == "tgi":
batch_params["max_batch_size"] = str(batch_param_value)
elif engine == "vllm":
batch_params["max_num_seqs"] = str(batch_param_value)
else:
engine_params.pop("batch_params")
# Handle token parameters
if "token_params" in engine_params:
token_params = engine_params["token_params"]
if test_mode == "tune" or token_params.get("enabled", False):
# Keep token parameters configuration but remove enabled field
token_params.pop("enabled", None)
else:
# Remove token parameters in OOB mode when disabled
engine_params.pop("token_params")
# Update model_params with engine-specific parameters only
model_params.clear()
model_params[engine] = engine_params
# Remove model_params if empty or if engine_params is empty
if not model_params or not engine_params:
service_config.pop("model_params")
return new_config
def pull_helm_chart(chart_pull_url, version, chart_name):
# Pull and untar the chart
subprocess.run(["helm", "pull", chart_pull_url, "--version", version, "--untar"], check=True)
current_dir = os.getcwd()
untar_dir = os.path.join(current_dir, chart_name)
if not os.path.isdir(untar_dir):
print(f"Error: Could not find untarred directory for {chart_name}")
return None
return untar_dir
def main(yaml_file, target_node=None, test_mode="oob"):
"""Main function to process deployment configuration.
Args:
yaml_file: Path to the YAML configuration file
target_node: Optional target number of nodes to deploy. If not specified, will process all nodes.
test_mode: Test mode, either "oob" (out of box) or "tune". Defaults to "oob".
"""
if test_mode not in ["oob", "tune"]:
print("Error: test_mode must be either 'oob' or 'tune'")
return None
config = read_yaml(yaml_file)
if config is None:
print("Failed to read YAML file.")
return None
deploy_config = config["deploy"]
benchmark_config = config["benchmark"]
# Extract chart name from the YAML file name
chart_name = os.path.splitext(os.path.basename(yaml_file))[0].split("_")[-1]
print(f"chart_name: {chart_name}")
python_cmd = sys.executable
# Process nodes
nodes = deploy_config.get("node", [])
if not isinstance(nodes, list):
print("Error: deploy_config['node'] must be an array")
return None
nodes_to_process = [target_node] if target_node is not None else nodes
node_names = deploy_config.get("node_name", [])
namespace = deploy_config.get("namespace", "default")
# Pull the Helm chart
chart_pull_url = f"oci://ghcr.io/opea-project/charts/{chart_name}"
version = deploy_config.get("version", "0-latest")
chart_dir = pull_helm_chart(chart_pull_url, version, chart_name)
if not chart_dir:
return
for node in nodes_to_process:
try:
print(f"\nProcessing configuration for {node} nodes...")
# Get corresponding node names for this node count
current_node_names = node_names[:node] if node_names else []
# Add labels for current node configuration
print(f"Adding labels for {node} nodes...")
cmd = [python_cmd, "deploy.py", "--chart-name", chart_name, "--num-nodes", str(node), "--add-label"]
if current_node_names:
cmd.extend(["--node-names"] + current_node_names)
result = subprocess.run(cmd, check=True)
if result.returncode != 0:
print(f"Failed to add labels for {node} nodes")
continue
try:
# Process batch parameters based on engine type
services = deploy_config.get("services", {})
llm_config = services.get("llm", {})
if "model_params" in llm_config:
model_params = llm_config["model_params"]
engine = llm_config.get("engine", "tgi")
# Get engine-specific parameters
engine_params = model_params.get(engine, {})
# Handle batch parameters
batch_params = []
if "batch_params" in engine_params:
key = "max_batch_size" if engine == "tgi" else "max_num_seqs"
batch_params = engine_params["batch_params"].get(key, [])
param_name = key
if not isinstance(batch_params, list):
batch_params = [batch_params]
# Skip multiple iterations if batch parameter is empty
if batch_params == [""] or not batch_params:
batch_params = [None]
else:
batch_params = [None]
param_name = "batch_param"
# Get timeout and interval from deploy config for check-ready
timeout = deploy_config.get("timeout", 1000) # default 1000s
interval = deploy_config.get("interval", 5) # default 5s
values_file_path = None
# Create benchmark output directory
benchmark_dir = os.path.join(os.getcwd(), "benchmark_output")
os.makedirs(benchmark_dir, exist_ok=True)
for i, batch_param in enumerate(batch_params):
print(f"\nProcessing {test_mode} mode {param_name}: {batch_param}")
# Create subdirectory for this iteration with test mode in the name
iteration_dir = os.path.join(
benchmark_dir,
f"benchmark_{test_mode}_node{node}_batch{batch_param if batch_param is not None else 'default'}",
)
os.makedirs(iteration_dir, exist_ok=True)
# Construct new deploy config
new_deploy_config = construct_deploy_config(deploy_config, node, batch_param, test_mode)
# Write the new deploy config to a temporary file
temp_config_file = (
f"temp_deploy_config_{node}.yaml"
if batch_param is None
else f"temp_deploy_config_{node}_{batch_param}.yaml"
)
try:
with open(temp_config_file, "w") as f:
yaml.dump(new_deploy_config, f)
if i == 0:
# First iteration: full deployment
cmd = [
python_cmd,
"deploy.py",
"--deploy-config",
temp_config_file,
"--chart-name",
chart_name,
"--namespace",
namespace,
"--chart-dir",
chart_dir,
]
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
match = re.search(r"values_file_path: (\S+)", result.stdout)
if match:
values_file_path = match.group(1)
print(f"Captured values_file_path: {values_file_path}")
# Copy values file to iteration directory
shutil.copy2(values_file_path, iteration_dir)
else:
print("values_file_path not found in the output")
else:
# Subsequent iterations: update services with config change
cmd = [
python_cmd,
"deploy.py",
"--deploy-config",
temp_config_file,
"--chart-name",
chart_name,
"--namespace",
namespace,
"--chart-dir",
chart_dir,
"--user-values",
values_file_path,
"--update-service",
]
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
if result.returncode != 0:
print(f"Update failed for {node} nodes configuration with {param_name} {batch_param}")
break # Skip remaining {param_name} for this node
# Update values_file_path from the output
match = re.search(r"values_file_path: (\S+)", result.stdout)
if match:
values_file_path = match.group(1)
print(f"Updated values_file_path: {values_file_path}")
# Copy values file to iteration directory
shutil.copy2(values_file_path, iteration_dir)
else:
print("values_file_path not found in the output")
# Wait for deployment to be ready
print("\nWaiting for deployment to be ready...")
cmd = [
python_cmd,
"deploy.py",
"--chart-name",
chart_name,
"--namespace",
namespace,
"--check-ready",
"--timeout",
str(timeout),
"--interval",
str(interval),
]
try:
result = subprocess.run(
cmd, check=False
) # Changed to check=False to handle return code manually
if result.returncode == 0:
print("Deployments are ready!")
# Run benchmark only if deployment is ready
run_benchmark(
benchmark_config=benchmark_config,
chart_name=chart_name,
namespace=namespace,
node_num=node,
llm_model=deploy_config.get("services", {}).get("llm", {}).get("model_id", ""),
output_dir=iteration_dir,
)
else:
print(
f"Deployments are not ready after timeout period during "
f"{'deployment' if i == 0 else 'update'} for {node} nodes. "
f"Skipping remaining iterations."
)
break # Exit the batch parameter loop for current node
except subprocess.CalledProcessError as e:
print(f"Error while checking deployment status: {str(e)}")
break # Exit the batch parameter loop for current node
except Exception as e:
print(
f"Error during {'deployment' if i == 0 else 'update'} for {node} nodes with {param_name} {batch_param}: {str(e)}"
)
break # Skip remaining {param_name} for this node
finally:
# Clean up the temporary file
if os.path.exists(temp_config_file):
os.remove(temp_config_file)
finally:
# Uninstall the deployment
print(f"\nUninstalling deployment for {node} nodes...")
cmd = [
python_cmd,
"deploy.py",
"--chart-name",
chart_name,
"--namespace",
namespace,
"--uninstall",
]
try:
result = subprocess.run(cmd, check=True)
if result.returncode != 0:
print(f"Failed to uninstall deployment for {node} nodes")
except Exception as e:
print(f"Error while uninstalling deployment for {node} nodes: {str(e)}")
# Delete labels for current node configuration
print(f"Deleting labels for {node} nodes...")
cmd = [python_cmd, "deploy.py", "--chart-name", chart_name, "--num-nodes", str(node), "--delete-label"]
if current_node_names:
cmd.extend(["--node-names"] + current_node_names)
try:
result = subprocess.run(cmd, check=True)
if result.returncode != 0:
print(f"Failed to delete labels for {node} nodes")
except Exception as e:
print(f"Error while deleting labels for {node} nodes: {str(e)}")
except Exception as e:
print(f"Error processing configuration for {node} nodes: {str(e)}")
continue
# Cleanup: Remove the untarred directory
if chart_dir and os.path.isdir(chart_dir):
print(f"Removing temporary directory: {chart_dir}")
shutil.rmtree(chart_dir)
print("Temporary directory removed successfully.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Deploy and benchmark with specific node configuration.")
parser.add_argument("yaml_file", help="Path to the YAML configuration file")
parser.add_argument("--target-node", type=int, help="Optional: Target number of nodes to deploy.", default=None)
parser.add_argument("--test-mode", type=str, help="Test mode, either 'oob' (out of box) or 'tune'.", default="oob")
args = parser.parse_args()
main(args.yaml_file, args.target_node, args.test_mode)