Skip to content

Commit

Permalink
Align chart testing logic with helm cert pipeline (#458)
Browse files Browse the repository at this point in the history
Signed-off-by: Jose R. Gonzalez <komish@flutes.dev>
  • Loading branch information
komish authored Jul 31, 2024
1 parent 7931d0d commit ccc6c49
Showing 1 changed file with 56 additions and 59 deletions.
115 changes: 56 additions & 59 deletions scripts/src/saforcharttesting/saforcharttesting.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import sys
import time
import os
import argparse
import base64
import json
import argparse
import os
import subprocess
import sys
import tempfile
import re
import time
from string import Template

namespace_template = """\
Expand All @@ -24,6 +23,17 @@
namespace: ${name}
"""

token_template = """\
apiVersion: v1
kind: Secret
type: kubernetes.io/service-account-token
metadata:
name: ${name}
namespace: ${name}
annotations:
kubernetes.io/service-account.name: ${name}
"""

role_template = """\
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
Expand Down Expand Up @@ -122,7 +132,7 @@ def apply_config(tmpl, **values):
config_path = os.path.join(tmpdir, "config.yaml")
with open(config_path, "w") as fd:
fd.write(content)
out = subprocess.run(["./oc", "apply", "-f", config_path], capture_output=True)
out = subprocess.run(["oc", "apply", "-f", config_path], capture_output=True)
stdout = out.stdout.decode("utf-8")
if out.returncode != 0:
stderr = out.stderr.decode("utf-8")
Expand All @@ -138,7 +148,7 @@ def delete_config(tmpl, **values):
config_path = os.path.join(tmpdir, "config.yaml")
with open(config_path, "w") as fd:
fd.write(content)
out = subprocess.run(["./oc", "delete", "-f", config_path], capture_output=True)
out = subprocess.run(["oc", "delete", "-f", config_path], capture_output=True)
stdout = out.stdout.decode("utf-8")
if out.returncode != 0:
stderr = out.stderr.decode("utf-8")
Expand All @@ -164,6 +174,14 @@ def create_serviceaccount(namespace):
print("[ERROR] creating ServiceAccount:", stderr)


def create_tokensecret(namespace):
print("creating token Secret:", namespace)
stdout, stderr = apply_config(token_template, name=namespace)
print("stdout:\n", stdout, sep="")
if stderr.strip():
print("[ERROR] creating token Secret:", stderr)


def create_role(namespace):
print("creating Role:", namespace)
stdout, stderr = apply_config(role_template, name=namespace)
Expand Down Expand Up @@ -223,80 +241,58 @@ def delete_clusterrolebinding(name):
sys.exit(1)


def write_sa_token(namespace, token):
secret_found = False
secrets = []
def write_sa_token(namespace, token_file):
"""Write's the service account token to token_file."""
token_found = False
for i in range(7):
# On retry, wait a little extra time before starting to give the cluster
# time to process the resources created before this.
if i > 0:
time.sleep(5)
print(f"[INFO] looking for service account token (retry {i})")
out = subprocess.run(
["./oc", "get", "serviceaccount", namespace, "-n", namespace, "-o", "json"],
["oc", "get", "secret", namespace, "-n", namespace, "-o", "json"],
capture_output=True,
)
stdout = out.stdout.decode("utf-8")
if out.returncode != 0:
stderr = out.stderr.decode("utf-8")
if stderr.strip():
print("[ERROR] retrieving ServiceAccount:", namespace, stderr)
time.sleep(10)
else:
sa = json.loads(stdout)
if len(sa["secrets"]) >= 2:
secrets = sa["secrets"]
secret_found = True
break
else:
pattern = r"Tokens:\s+([A-Za-z0-9-]+)"
dout = subprocess.run(
["./oc", "describe", "serviceaccount", namespace, "-n", namespace],
capture_output=True,
)
dstdout = dout.stdout.decode("utf-8")
match = re.search(pattern, dstdout)
if match:
token_name = match.group(1)
else:
print("[ERROR] Token not found, Exiting")
sys.exit(1)
secrets.append({"name": token_name})
secret_found = True
break
time.sleep(10)
print("[ERROR] retrieving token secret:", namespace, stderr)
continue

if not secret_found:
print("[ERROR] retrieving ServiceAccount:", namespace, stderr)
sys.exit(1)
secret = json.loads(stdout)
token = secret.get("data", {}).get("token", None)

for secret in secrets:
out = subprocess.run(
["./oc", "get", "secret", secret["name"], "-n", namespace, "-o", "json"],
capture_output=True,
if not token:
print("[ERROR] token not yet found in secret:", namespace)
continue

token_found = True
break

if not token_found:
print(
"[ERROR] all attempts to find service account token have failed:", namespace
)
stdout = out.stdout.decode("utf-8")
if out.returncode != 0:
stderr = out.stderr.decode("utf-8")
if stderr.strip():
print("[ERROR] retrieving secret:", secret["name"], stderr)
continue
else:
sec = json.loads(stdout)
if sec["type"] == "kubernetes.io/service-account-token":
content = sec["data"]["token"]
with open(token, "w") as fd:
fd.write(base64.b64decode(content).decode("utf-8"))
sys.exit(1)

with open(token_file, "w") as fd:
fd.write(base64.b64decode(token).decode("utf-8"))


def switch_project_context(namespace, token, api_server):
tkn = open(token).read()
for i in range(7):
out = subprocess.run(
["./oc", "login", "--token", tkn, "--server", api_server],
capture_output=True,
["oc", "login", "--token", tkn, "--server", api_server], capture_output=True
)
stdout = out.stdout.decode("utf-8")
print(stdout)
out = subprocess.run(["./oc", "project", namespace], capture_output=True)
out = subprocess.run(["oc", "project", namespace], capture_output=True)
stdout = out.stdout.decode("utf-8")
print(stdout)
out = subprocess.run(["./oc", "config", "current-context"], capture_output=True)
out = subprocess.run(["oc", "config", "current-context"], capture_output=True)
stdout = out.stdout.decode("utf-8").strip()
print(stdout)
if stdout.endswith(":".join((namespace, namespace))):
Expand Down Expand Up @@ -349,6 +345,7 @@ def main():
if args.create:
create_namespace(args.create)
create_serviceaccount(args.create)
create_tokensecret(args.create)
create_role(args.create)
create_rolebinding(args.create)
create_clusterrole(args.create)
Expand Down

0 comments on commit ccc6c49

Please sign in to comment.