Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hardware parameters, secret parameters, and taxonomy repo authentication #272

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ To collaborate on this repository, please follow these steps:
source .venv/bin/activate
```

## Adding/Updating dependencies
## Adding/Updating dependencies

When updating python package dependencies in `pyproject.toml`, regenerate [requirements.txt](requirements.txt):

Expand All @@ -225,7 +225,7 @@ For this you need [pybuild-deps](https://pybuild-deps.readthedocs.io/en/latest/u
Temporarily remove `kfp-pipeline-spec` from `requirement.txt`. And run:

```bash
pybuild-deps compile requirements.txt -o requirements-build.txt
pybuild-deps compile requirements.txt -o requirements-build.txt
```

> Note that, we do this because `kfp-pipeline-spec` only includes wheels and not the sources, this breaks
Expand Down
52 changes: 46 additions & 6 deletions eval/final.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ def run_final_eval_op(
sdg_path: str = "/input/sdg",
mmlu_branch_output_path: str = "/output/mmlu_branch",
mt_bench_branch_output_path: str = "/output/mt_bench_branch",
judge_secret_name: str = None,
):
import base64
import json
import os
import subprocess
from pathlib import Path

import httpx
import requests
import torch
from instructlab.eval.mmlu import MMLUBranchEvaluator
from instructlab.eval.mt_bench import MTBenchBranchEvaluator
from instructlab.model.evaluate import qa_pairs_to_qna_to_avg_scores, sort_score

judge_api_key = os.getenv("JUDGE_API_KEY", "")
judge_model_name = os.getenv("JUDGE_NAME")
judge_endpoint = os.getenv("JUDGE_ENDPOINT")
judge_ca_cert_path = os.getenv("JUDGE_CA_CERT_PATH")
use_tls = os.path.exists(judge_ca_cert_path) and (
os.path.getsize(judge_ca_cert_path) > 0
Expand Down Expand Up @@ -341,9 +341,49 @@ def find_node_dataset_directories(base_dir: str):

print("Starting MT_BENCH_BRANCH ...")

judge_api_key = os.getenv("JUDGE_API_KEY", "")
judge_model_name = os.getenv("JUDGE_NAME")
judge_endpoint = os.getenv("JUDGE_ENDPOINT")
def fetch_secret(secret_name, keys):
# Kubernetes API server inside the cluster
K8S_API_SERVER = "https://kubernetes.default.svc"
NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token"

# Fetch namespace
try:
with open(NAMESPACE_PATH, "r") as f:
namespace = f.read().strip()
except FileNotFoundError:
raise RuntimeError("Error reading namespace")

# Fetch service account token
try:
with open(TOKEN_PATH, "r") as f:
token = f.read().strip()
except FileNotFoundError:
raise RuntimeError("Error reading service account token")

headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
verify_tls = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
url = f"{K8S_API_SERVER}/api/v1/namespaces/{namespace}/secrets/{secret_name}"
response = requests.get(url, headers=headers, verify=verify_tls)

if response.status_code == 200:
secret_data = response.json().get("data", {})
return [base64.b64decode(secret_data[key]).decode() for key in keys]
else:
raise RuntimeError(
f"Error fetching secret: {response.status_code} {response.text}"
)

if judge_secret_name is None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side note on this, we keep the previous approach for 2 reasons:

  1. we dont' want to break standalone.py, and this code is leveraged there so we need to maintain backwards compatibility
  2. we will need this again when we use the sdk to mount the secrets, and we'll get rid of the new additional calls to fetch_secret

judge_api_key = os.getenv("JUDGE_API_KEY", "")
judge_model_name = os.getenv("JUDGE_NAME")
judge_endpoint = os.getenv("JUDGE_ENDPOINT")
else:
print("Eval Judge secret specified, fetching...")
judge_api_key, judge_model_name, judge_endpoint = fetch_secret(
judge_secret_name, ["api_token", "model_name", "endpoint"]
)
print("Eval Judge secret data retrieved.")

output_dir = "/tmp/eval_output"

Expand Down
50 changes: 47 additions & 3 deletions eval/mt_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,62 @@ def run_mt_bench_op(
max_workers: str,
models_folder: str,
output_path: str = "/output/mt_bench_data.json",
judge_secret_name: str = None,
) -> NamedTuple("outputs", best_model=str, best_score=float):
import base64
import json
import os
import subprocess

import httpx
import requests
import torch
from instructlab.eval.mt_bench import MTBenchEvaluator

judge_api_key = os.getenv("JUDGE_API_KEY", "")
judge_model_name = os.getenv("JUDGE_NAME")
judge_endpoint = os.getenv("JUDGE_ENDPOINT")
def fetch_secret(secret_name, keys):
# Kubernetes API server inside the cluster
K8S_API_SERVER = "https://kubernetes.default.svc"
NAMESPACE_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token"

# Fetch namespace
try:
with open(NAMESPACE_PATH, "r") as f:
namespace = f.read().strip()
except FileNotFoundError:
raise RuntimeError("Error reading namespace")

# Fetch service account token
try:
with open(TOKEN_PATH, "r") as f:
token = f.read().strip()
except FileNotFoundError:
raise RuntimeError("Error reading service account token")

headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
verify_tls = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
url = f"{K8S_API_SERVER}/api/v1/namespaces/{namespace}/secrets/{secret_name}"
response = requests.get(url, headers=headers, verify=verify_tls)

if response.status_code == 200:
secret_data = response.json().get("data", {})
return [base64.b64decode(secret_data[key]).decode() for key in keys]
else:
raise RuntimeError(
f"Error fetching secret: {response.status_code} {response.text}"
)

if judge_secret_name is None:
judge_api_key = os.getenv("JUDGE_API_KEY", "")
judge_model_name = os.getenv("JUDGE_NAME")
judge_endpoint = os.getenv("JUDGE_ENDPOINT")
else:
print("Eval Judge secret specified, fetching...")
judge_api_key, judge_model_name, judge_endpoint = fetch_secret(
judge_secret_name, ["api_token", "model_name", "endpoint"]
)
print("Eval Judge secret data retrieved.")

judge_ca_cert_path = os.getenv("JUDGE_CA_CERT_PATH")
use_tls = os.path.exists(judge_ca_cert_path) and (
os.path.getsize(judge_ca_cert_path) > 0
Expand Down
29 changes: 8 additions & 21 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def ilab_pipeline(
eval_judge_secret: str = "judge-secret",
# Other options
k8s_storage_class_name: str = "standard", # FIXME: https://github.com/kubeflow/pipelines/issues/11396, https://issues.redhat.com/browse/RHOAIRFE-470
k8s_storage_size: str = "100Gi",
):
"""InstructLab pipeline

Expand Down Expand Up @@ -179,13 +180,14 @@ def ilab_pipeline(
eval_judge_secret: General evaluation parameter: The name of the k8s secret key holding access credentials to the judge server.

k8s_storage_class_name: A Kubernetes StorageClass name for persistent volumes. Selected StorageClass must support RWX PersistentVolumes.
k8s_storage_size: The storage size of the persistent volume used for data passing within the pipeline.
"""

# SDG stage
sdg_input_pvc_task = CreatePVC(
pvc_name_suffix="-sdg",
access_modes=["ReadWriteMany"],
size="10Gi",
size=k8s_storage_size,
storage_class_name=k8s_storage_class_name,
)
git_clone_task = git_clone_op(
Expand Down Expand Up @@ -213,13 +215,10 @@ def ilab_pipeline(
repo_branch=sdg_repo_branch,
repo_pr=sdg_repo_pr,
sdg_sampling_size=sdg_sample_size,
sdg_secret_name=sdg_teacher_secret,
)
sdg_task.set_env_variable("HOME", "/tmp")
sdg_task.set_env_variable("HF_HOME", "/tmp")
use_config_map_as_env(
sdg_task, TEACHER_CONFIG_MAP, dict(endpoint="endpoint", model="model")
)
use_secret_as_env(sdg_task, TEACHER_SECRET, {"api_key": "api_key"})
use_config_map_as_volume(sdg_task, TEACHER_CONFIG_MAP, mount_path=SDG_CA_CERT_PATH)
sdg_task.set_env_variable(
SDG_CA_CERT_ENV_VAR_NAME, os.path.join(SDG_CA_CERT_PATH, SDG_CA_CERT_CM_KEY)
Expand Down Expand Up @@ -260,7 +259,7 @@ def ilab_pipeline(
model_pvc_task = CreatePVC(
pvc_name_suffix="-model-cache",
access_modes=["ReadWriteMany"],
size="100Gi",
size=k8s_storage_size,
storage_class_name=k8s_storage_class_name,
)

Expand Down Expand Up @@ -309,7 +308,7 @@ def ilab_pipeline(
output_pvc_task = CreatePVC(
pvc_name_suffix="-output",
access_modes=["ReadWriteMany"],
size="100Gi",
size=k8s_storage_size,
storage_class_name=k8s_storage_class_name,
)

Expand Down Expand Up @@ -380,6 +379,7 @@ def ilab_pipeline(
models_folder="/output/phase_2/model/hf_format",
max_workers=mt_bench_max_workers,
merge_system_user_message=mt_bench_merge_system_user_message,
judge_secret_name=eval_judge_secret,
)
mount_pvc(
task=run_mt_bench_task,
Expand All @@ -392,12 +392,6 @@ def ilab_pipeline(
run_mt_bench_task.set_accelerator_limit(1)
run_mt_bench_task.set_caching_options(False)
run_mt_bench_task.after(training_phase_2)
use_config_map_as_env(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have been removed to maintain backwards compatibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I'm following, backwards compatibility for what?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code under if judge_secret_name is None: relies on an environment variable of JUDGE_ENDPOINT and JUDGE_NAME does it not?

Copy link
Contributor Author

@HumairAK HumairAK Mar 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see what you mean, the backwards compatibility comment I left in the PR description refers to only standalone.py, which will utilize the same component code for sdg/mt_eval/final_eval but will mount the configmaps/secrets as env vars (it's a bit convoluted), for example for sdg this is done here, we want to maintain compatibility with standalone.py

from the pipeline's perspective, you provide a secret name, and we will use it, how we use it is an implementation detail

run_mt_bench_task,
JUDGE_CONFIG_MAP,
dict(endpoint="JUDGE_ENDPOINT", model="JUDGE_NAME"),
)
use_secret_as_env(run_mt_bench_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"})

use_config_map_as_volume(
run_mt_bench_task, JUDGE_CONFIG_MAP, mount_path=JUDGE_CA_CERT_PATH
Expand All @@ -420,6 +414,7 @@ def ilab_pipeline(
merge_system_user_message=final_eval_merge_system_user_message,
few_shots=final_eval_few_shots,
batch_size=final_eval_batch_size,
judge_secret_name=eval_judge_secret,
)
mount_pvc(
task=final_eval_task, pvc_name=output_pvc_task.output, mount_path="/output"
Expand All @@ -435,20 +430,12 @@ def ilab_pipeline(
mount_path="/model",
)

use_config_map_as_env(
final_eval_task,
JUDGE_CONFIG_MAP,
dict(endpoint="JUDGE_ENDPOINT", model="JUDGE_NAME"),
)

final_eval_task.set_env_variable("HOME", "/tmp")
final_eval_task.set_env_variable("HF_HOME", "/tmp")

# uncomment if updating image with same tag
# set_image_pull_policy(final_eval_task, "Always")

use_secret_as_env(final_eval_task, JUDGE_SECRET, {"api_key": "JUDGE_API_KEY"})

use_config_map_as_volume(
final_eval_task, JUDGE_CONFIG_MAP, mount_path=JUDGE_CA_CERT_PATH
)
Expand Down
Loading