From 70c867be9b48a5dcf0ea3a9bc05c7da2b5d9cb70 Mon Sep 17 00:00:00 2001 From: Ole-Martin Bratteng <1681525+omBratteng@users.noreply.github.com> Date: Thu, 12 Dec 2024 19:14:55 +0100 Subject: [PATCH] refactor: migrate debezium offset over to gcs (#470) --- src/debezium.ts | 154 ++++++++++++++++++++++++++++++--------------- src/k8s.ts | 13 +++- src/suite/index.ts | 13 +++- src/utils.ts | 7 ++- 4 files changed, 135 insertions(+), 52 deletions(-) diff --git a/src/debezium.ts b/src/debezium.ts index ccc3674..9d2eab2 100644 --- a/src/debezium.ts +++ b/src/debezium.ts @@ -6,7 +6,11 @@ import { Input, Output, ProviderResource, interpolate } from '@pulumi/pulumi'; import * as pulumi from '@pulumi/pulumi'; import { input as inputs } from '@pulumi/kubernetes/types'; import { stripCpuFromLimits } from './utils'; -import { getSpotSettings, PodResources } from './k8s'; +import { + getSpotSettings, + k8sServiceAccountToWorkloadPrincipal, + PodResources, +} from './k8s'; type OptionalArgs = { diskType?: Input; @@ -76,6 +80,69 @@ export function deployDebeziumSharedDependencies( return { debeziumSa, debeziumKey, disk }; } +export const deployDebeziumSharedDependenciesV2 = ( + { + name, + namespace, + isAdhocEnv = false, + resourcePrefix, + }: { + name: string; + namespace: string; + isAdhocEnv?: boolean; + resourcePrefix: string; + }, + provider?: k8s.Provider, +): k8s.core.v1.ServiceAccount | undefined => { + const serviceAccount = new k8s.core.v1.ServiceAccount( + `${resourcePrefix}${name}-debezium-k8s-sa`, + { + metadata: { + namespace, + name: `${name}-debezium`, + }, + }, + { provider }, + ); + + if (!isAdhocEnv) { + const bucket = new gcp.storage.Bucket( + `${resourcePrefix}${name}-debezium-storage`, + { + name: `${name}-debezium-storage`, + location: 'us', + publicAccessPrevention: 'enforced', + project: 'devkit-prod', + forceDestroy: true, + uniformBucketLevelAccess: true, + }, + ); + + const objectUsers = gcp.organizations.getIAMPolicy({ + bindings: [ + { + role: 'roles/storage.objectUser', + members: [ + k8sServiceAccountToWorkloadPrincipal( + serviceAccount, + ) as unknown as string, + ], + }, + ], + }); + + new gcp.storage.BucketIAMPolicy( + `${resourcePrefix}${name}-debezium-storage-policy`, + { + bucket: bucket.name, + policyData: objectUsers.then((objectUser) => objectUser.policyData), + }, + ); + } + + return serviceAccount; +}; + /** * Deploys only the Kubernetes resources for Debezium */ @@ -97,6 +164,7 @@ export function deployDebeziumKubernetesResources( isAdhocEnv, disableHealthCheck, affinity, + k8sServiceAccount, }: Pick< OptionalArgs, | 'limits' @@ -107,7 +175,7 @@ export function deployDebeziumKubernetesResources( | 'isAdhocEnv' | 'disableHealthCheck' | 'affinity' - > = {}, + > & { k8sServiceAccount?: k8s.core.v1.ServiceAccount } = {}, ): void { const propsHash = debeziumPropsString.apply((props) => createHash('md5').update(props).digest('hex'), @@ -225,15 +293,37 @@ export function deployDebeziumKubernetesResources( claimName: `${name}-debezium-pvc`, }, }); - volumeMounts.push({ name: 'storage', mountPath: '/debezium/data' }); + volumeMounts.push({ name: 'storage', mountPath: '/pvc/data' }); initContainers.push({ - name: 'data-ownership', + name: 'copy-offsets', image: 'alpine:3', - command: ['chmod', '777', '/debezium/data'], - volumeMounts: [{ name: 'storage', mountPath: '/debezium/data' }], + command: [ + 'sh', + '-c', + '[ -f /pvc/data/offsets.dat ] && mv /pvc/data/offsets.dat /debezium/data/offsets.dat || true', + ], + volumeMounts: [ + { name: 'storage', mountPath: '/pvc/data' }, + { name: 'gcs-fuse-csi-ephemeral', mountPath: '/debezium/data' }, + ], }); } + volumes.push({ + name: 'gcs-fuse-csi-ephemeral', + csi: { + driver: 'gcsfuse.csi.storage.gke.io', + volumeAttributes: { + bucketName: `${name}-debezium-storage`, + mountOptions: 'implicit-dirs,uid=185,gid=0', + }, + }, + }); + volumeMounts.push({ + name: 'gcs-fuse-csi-ephemeral', + mountPath: '/debezium/data', + }); + let livenessProbe: k8s.types.input.core.v1.Probe | undefined; if (!disableHealthCheck) { livenessProbe = { @@ -261,6 +351,12 @@ export function deployDebeziumKubernetesResources( template: { metadata: { labels: { ...labels, props: propsHash }, + annotations: { + 'gke-gcsfuse/volumes': 'true', + 'gke-gcsfuse/memory-limit': '128Mi', + 'gke-gcsfuse/cpu-request': '50m', + 'gke-gcsfuse/memory-request': '32Mi', + }, }, spec: { nodeSelector: disk @@ -270,6 +366,9 @@ export function deployDebeziumKubernetesResources( initContainers, affinity: !isAdhocEnv ? affinity : undefined, tolerations, + serviceAccountName: k8sServiceAccount?.metadata.apply( + (metadata) => metadata.name, + ), containers: [ { name: 'debezium', @@ -299,46 +398,3 @@ export function deployDebeziumKubernetesResources( { provider }, ); } - -export function deployDebeziumWithDependencies( - name: string, - namespace: string | Input, - debeziumPropsString: Output, - diskZone: Input, - { - diskType, - diskSize, - limits = { - cpu: '1', - memory: '1024Mi', - }, - env = [], - image = 'debezium/server:1.6', - resourcePrefix = '', - provider, - isAdhocEnv, - disableHealthCheck, - }: OptionalArgs = {}, -): void { - const { debeziumKey, disk } = deployDebeziumSharedDependencies( - name, - diskZone, - { diskType, diskSize, resourcePrefix, isAdhocEnv }, - ); - deployDebeziumKubernetesResources( - name, - namespace, - debeziumPropsString, - debeziumKey, - disk, - { - limits, - env, - image, - resourcePrefix, - provider, - isAdhocEnv, - disableHealthCheck, - }, - ); -} diff --git a/src/k8s.ts b/src/k8s.ts index 7cfb42d..11a2eab 100644 --- a/src/k8s.ts +++ b/src/k8s.ts @@ -9,7 +9,7 @@ import { import * as gcp from '@pulumi/gcp'; import { autoscaling } from '@pulumi/kubernetes/types/input'; import { Resource } from '@pulumi/pulumi/resource'; -import { camelToUnderscore } from './utils'; +import { camelToUnderscore, gcpProjectNumber } from './utils'; import { NodeLabels } from './kubernetes'; import { defaultSpotWeight } from './constants'; @@ -28,6 +28,14 @@ export function k8sServiceAccountToIdentity( ); } +export const k8sServiceAccountToWorkloadPrincipal = ( + serviceAccount: k8s.core.v1.ServiceAccount, +): Output => + serviceAccount.metadata.apply( + (metadata) => + `principal://iam.googleapis.com/projects/${gcpProjectNumber()}/locations/global/workloadIdentityPools/${gcp.config.project}.svc.id.goog/subject/ns/${metadata.namespace}/sa/${metadata.name}`, + ); + export function createK8sServiceAccountFromGCPServiceAccount( resourceName: string, name: string, @@ -171,6 +179,9 @@ export const getMemoryAndCpuMetrics = ( }, ]; +/** + * @deprecated Use createAndBindK8sServiceAccount instead + */ export const bindK8sServiceAccountToGCP = ( resourcePrefix: string, name: string, diff --git a/src/suite/index.ts b/src/suite/index.ts index 730afaf..a7b78f7 100644 --- a/src/suite/index.ts +++ b/src/suite/index.ts @@ -30,6 +30,7 @@ import { import { deployDebeziumKubernetesResources, deployDebeziumSharedDependencies, + deployDebeziumSharedDependenciesV2, } from '../debezium'; import { location } from '../config'; import { stripCpuFromLimits } from '../utils'; @@ -73,7 +74,7 @@ export function customMetricToK8s( } } -function createAndBindK8sServiceAccount( +export function createAndBindK8sServiceAccount( resourcePrefix: string | undefined, name: string, namespace: string, @@ -458,6 +459,15 @@ export function deployApplicationSuiteToProvider({ isAdhocEnv, }, ); + const k8sServiceAccount = deployDebeziumSharedDependenciesV2( + { + name, + namespace, + resourcePrefix, + isAdhocEnv, + }, + provider, + ); // Useful if we want to migrate Debezium without affecting its dependencies if (!debezium.dependenciesOnly) { const debeziumDefault = isAdhocEnv ? '2.0' : '1.6'; @@ -476,6 +486,7 @@ export function deployApplicationSuiteToProvider({ env: debezium.env, disableHealthCheck: debezium.disableHealthCheck, affinity: debezium.affinity, + k8sServiceAccount, }, ); } diff --git a/src/utils.ts b/src/utils.ts index 0a16b7a..2cbaff8 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,4 @@ -import { all, Input, Output } from '@pulumi/pulumi'; +import { all, Config, Input, Output } from '@pulumi/pulumi'; import { PodResources } from './k8s'; export type AdhocEnv = { isAdhocEnv: boolean }; @@ -22,3 +22,8 @@ export function stripCpuFromLimits( // eslint-disable-next-line @typescript-eslint/no-unused-vars return all([requests]).apply(([{ cpu, ...rest }]) => rest); } + +export const gcpProjectNumber = () => { + const __config = new Config(); + return __config.require('projectNumber'); +};