Skip to content

Commit

Permalink
Parallelize install further
Browse files Browse the repository at this point in the history
  • Loading branch information
nktpro committed Feb 3, 2025
1 parent f1f522f commit 6cd9f84
Showing 1 changed file with 142 additions and 72 deletions.
214 changes: 142 additions & 72 deletions src/actions/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { calculateDirectoryDigest, getCurrentKubeContext } from "../libs/cli_uti
import { stringifyYamlRelaxed } from "../libs/yaml_utils.ts";
import { getDefaultLogger, type Logger } from "@wok/utils/logger";
import { fetchCurrentWhitelist, type HelmetWhitelist } from "./whitelist_instance.ts";
import { arrayPartition } from "@wok/utils/collection";

export function createDigestConfigMap(
{ name, namespace, digest }: { name: string; namespace: string; digest: string },
Expand Down Expand Up @@ -221,11 +222,25 @@ export const ParamsSchema = {
const ParamsSchemaObj = Obj(ParamsSchema);
type ParamsSchema = typeof ParamsSchemaObj.infer;

export async function install(
{ source, wait, atomic, cleanupOnFail, force, createNamespace, debug, timeout, ignorePurity, logger, whitelist }:
& Omit<ParamsSchema, "_">
& { source: string; logger: Logger; whitelist: HelmetWhitelist },
): Promise<boolean> {
interface Installation {
name: string;
namespace: string;
crdsPath: string;
namespacesPath: string;
resourcesPath: string;
shouldInstallCrds: boolean;
shouldInstallNamespaces: boolean;
shouldInstallResources: boolean;
}

async function prepare(
{ source, ignorePurity, logger, whitelist }: {
source: string;
ignorePurity: boolean;
logger: Logger;
whitelist: HelmetWhitelist;
},
): Promise<Installation | null> {
const resolvedSource = resolvePath(source);
const metaPath = resolvePath(joinPath(resolvedSource, "meta.json"));
const crdsPath = joinPath(resolvedSource, "crds");
Expand All @@ -239,13 +254,17 @@ export async function install(
let newNamespacesDigest: string | undefined;
let newResourcesDigest: string | undefined;

logger.info?.(`Installing ${cyan(resolvedSource)}`);
logger.info?.(`Preparing ${cyan(resolvedSource)}`);
const metaJson = JSON.parse(await Deno.readTextFile(metaPath));
const metaValidation = validate(CompiledBundleMetaSchema, metaJson);

if (!metaValidation.isSuccess) {
logger.error?.("Failed validating compiled bundle meta.json", JSON.stringify(metaValidation.errors, null, 2));
return false;
logger.error?.(
"Failed validating compiled bundle metadata",
metaPath,
JSON.stringify(metaValidation.errors, null, 2),
);
return null;
}

const { pure, name, namespace } = metaValidation.value;
Expand All @@ -256,7 +275,7 @@ export async function install(
installLogger.error?.("Bundle instance", bold(yellow(name)), "is not whitelisted");
installLogger.error?.("Current Kubernetes context is", cyan(currentKubeContext.trim()));
installLogger.error?.("The current whitelisted set is", whitelist.set);
return false;
return null;
}

if (pure) {
Expand Down Expand Up @@ -290,55 +309,98 @@ export async function install(
}

const crdsRenderedPath = joinPath(crdsPath, "rendered");

const hasCrds = (await Array.fromAsync(expandGlob("*.yaml", { root: crdsRenderedPath }))).length > 0;
const shouldInstallCrds = hasCrds && (newCrdsDigest === undefined || newCrdsDigest !== currentCrdsDigest);
const shouldInstallNamespaces = newNamespacesDigest === undefined || newNamespacesDigest !== currentNamespacesDigest;
const shouldInstallResources = newResourcesDigest === undefined || newResourcesDigest !== currentResourcesDigest;

if (shouldInstallCrds && newCrdsDigest !== undefined) {
await Deno.writeTextFile(
joinPath(crdsRenderedPath, `digest-${newCrdsDigest}.yaml`),
stringifyYamlRelaxed(createDigestConfigMap({ name: `${name}-crds`, namespace, digest: newCrdsDigest })),
);
}

if (hasCrds) {
if (newCrdsDigest !== undefined && newCrdsDigest === currentCrdsDigest) {
installLogger.info?.("CRDs have not changed, skipping installation");
} else {
if (newCrdsDigest !== undefined) {
await Deno.writeTextFile(
joinPath(crdsRenderedPath, `digest-${newCrdsDigest}.yaml`),
stringifyYamlRelaxed(createDigestConfigMap({ name: `${name}-crds`, namespace, digest: newCrdsDigest })),
);
}

const kubectlApplyCmd = [
"kubectl",
"apply",
"--server-side",
"--force-conflicts",
"-f",
crdsRenderedPath,
];

installLogger.info?.("Executing:", cyan(kubectlApplyCmd.join(" ")));
const tag = gray(`[$ ${kubectlApplyCmd.slice(0, 2).join(" ")} ...]`);
await inheritExec({
cmd: kubectlApplyCmd,
stderr: {
read: printErrLines((line) => `${tag} ${line}`),
},
stdout: {
read: printOutLines((line) => `${tag} ${line}`),
},
});
}
if (shouldInstallNamespaces && newNamespacesDigest !== undefined) {
await Deno.writeTextFile(
joinPath(namespacesPath, "rendered", `digest-${newNamespacesDigest}.yaml`),
stringifyYamlRelaxed(
createDigestConfigMap({ name: `${name}-namespaces`, namespace, digest: newNamespacesDigest }),
),
);
}

if (shouldInstallResources && newResourcesDigest !== undefined) {
await Deno.writeTextFile(
joinPath(resourcesPath, "rendered", `digest-${newResourcesDigest}.yaml`),
stringifyYamlRelaxed(
createDigestConfigMap({ name: `${name}-resources`, namespace, digest: newResourcesDigest }),
),
);
}

if (newNamespacesDigest !== undefined && newNamespacesDigest === currentNamespacesDigest) {
installLogger.info?.("Namespaces have not changed, skipping installation");
return {
name,
namespace,
shouldInstallCrds,
shouldInstallNamespaces,
shouldInstallResources,
crdsPath,
namespacesPath,
resourcesPath,
};
}

export async function install(
{
installation: {
name,
namespace,
shouldInstallCrds,
shouldInstallNamespaces,
shouldInstallResources,
crdsPath,
namespacesPath,
resourcesPath,
},
params: {
wait,
atomic,
cleanupOnFail,
force,
createNamespace,
debug,
timeout,
},
logger,
}: { installation: Installation; params: Omit<ParamsSchema, "_">; logger: Logger },
): Promise<void> {
if (shouldInstallCrds) {
const kubectlApplyCmd = [
"kubectl",
"apply",
"--server-side",
"--force-conflicts",
"-f",
joinPath(crdsPath, "rendered"),
];

logger.info?.("Executing:", cyan(kubectlApplyCmd.join(" ")));
const tag = gray(`[$ ${kubectlApplyCmd.slice(0, 2).join(" ")} ...]`);
await inheritExec({
cmd: kubectlApplyCmd,
stderr: {
read: printErrLines((line) => `${tag} ${line}`),
},
stdout: {
read: printOutLines((line) => `${tag} ${line}`),
},
});
} else {
if (newNamespacesDigest !== undefined) {
await Deno.writeTextFile(
joinPath(namespacesPath, "rendered", `digest-${newNamespacesDigest}.yaml`),
stringifyYamlRelaxed(
createDigestConfigMap({ name: `${name}-namespaces`, namespace, digest: newNamespacesDigest }),
),
);
}
logger.info?.("Skipping CRDs, no changes detected");
}

if (shouldInstallNamespaces) {
await helmInstall({
name: `${name}-namespaces`,
namespace,
Expand All @@ -350,22 +412,13 @@ export async function install(
timeout,
createNamespace,
debug,
logger: installLogger,
logger,
});
}

if (newResourcesDigest !== undefined && newResourcesDigest === currentResourcesDigest) {
installLogger.info?.("Resources have not changed, skipping installation");
} else {
if (newResourcesDigest !== undefined) {
await Deno.writeTextFile(
joinPath(resourcesPath, "rendered", `digest-${newResourcesDigest}.yaml`),
stringifyYamlRelaxed(
createDigestConfigMap({ name: `${name}-resources`, namespace, digest: newResourcesDigest }),
),
);
}
logger.info?.("Skipping namespaces, no changes detected");
}

if (shouldInstallResources) {
await helmInstall({
name: `${name}-resources`,
namespace,
Expand All @@ -377,22 +430,39 @@ export async function install(
timeout,
createNamespace,
debug,
logger: installLogger,
logger,
});
} else {
logger.info?.("Skipping resources, no changes detected");
}

return true;
}

export default createCliAction(ParamsSchema, async ({ _: sources, ...args }) => {
export default createCliAction(ParamsSchema, async ({ _: sources, ...params }) => {
const logger = getDefaultLogger();

const whitelist = await fetchCurrentWhitelist();

for (const source of sources) {
if (!await install({ ...args, source, logger, whitelist })) {
return ExitCode.One;
const [installations, invalid] = arrayPartition(
await Promise.all(
sources.map((source) => prepare({ source, logger, whitelist, ignorePurity: params.ignorePurity })),
),
(i) => i !== null,
);

if (invalid.length > 0) {
return ExitCode.One;
}

for (const installation of installations) {
const installLogger = logger.prefixed(gray(installation.name));
if (
!installation.shouldInstallCrds && !installation.shouldInstallNamespaces && !installation.shouldInstallResources
) {
installLogger.info?.("Bundle has not changed, skipping installation");
continue;
}

await install({ params, installation, logger: installLogger });
}

return ExitCode.Zero;
Expand Down

0 comments on commit 6cd9f84

Please sign in to comment.