Skip to content

Commit

Permalink
refactor: migrate debezium offset over to gcs (#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
omBratteng authored Dec 12, 2024
1 parent fdeae7a commit 70c867b
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 52 deletions.
154 changes: 105 additions & 49 deletions src/debezium.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import { Input, Output, ProviderResource, interpolate } from '@pulumi/pulumi';
import * as pulumi from '@pulumi/pulumi';
import { input as inputs } from '@pulumi/kubernetes/types';
import { stripCpuFromLimits } from './utils';
import { getSpotSettings, PodResources } from './k8s';
import {
getSpotSettings,
k8sServiceAccountToWorkloadPrincipal,
PodResources,
} from './k8s';

type OptionalArgs = {
diskType?: Input<string>;
Expand Down Expand Up @@ -76,6 +80,69 @@ export function deployDebeziumSharedDependencies(
return { debeziumSa, debeziumKey, disk };
}

export const deployDebeziumSharedDependenciesV2 = (
{
name,
namespace,
isAdhocEnv = false,
resourcePrefix,
}: {
name: string;
namespace: string;
isAdhocEnv?: boolean;
resourcePrefix: string;
},
provider?: k8s.Provider,
): k8s.core.v1.ServiceAccount | undefined => {
const serviceAccount = new k8s.core.v1.ServiceAccount(
`${resourcePrefix}${name}-debezium-k8s-sa`,
{
metadata: {
namespace,
name: `${name}-debezium`,
},
},
{ provider },
);

if (!isAdhocEnv) {
const bucket = new gcp.storage.Bucket(
`${resourcePrefix}${name}-debezium-storage`,
{
name: `${name}-debezium-storage`,
location: 'us',
publicAccessPrevention: 'enforced',
project: 'devkit-prod',
forceDestroy: true,
uniformBucketLevelAccess: true,
},
);

const objectUsers = gcp.organizations.getIAMPolicy({
bindings: [
{
role: 'roles/storage.objectUser',
members: [
k8sServiceAccountToWorkloadPrincipal(
serviceAccount,
) as unknown as string,
],
},
],
});

new gcp.storage.BucketIAMPolicy(
`${resourcePrefix}${name}-debezium-storage-policy`,
{
bucket: bucket.name,
policyData: objectUsers.then((objectUser) => objectUser.policyData),
},
);
}

return serviceAccount;
};

/**
* Deploys only the Kubernetes resources for Debezium
*/
Expand All @@ -97,6 +164,7 @@ export function deployDebeziumKubernetesResources(
isAdhocEnv,
disableHealthCheck,
affinity,
k8sServiceAccount,
}: Pick<
OptionalArgs,
| 'limits'
Expand All @@ -107,7 +175,7 @@ export function deployDebeziumKubernetesResources(
| 'isAdhocEnv'
| 'disableHealthCheck'
| 'affinity'
> = {},
> & { k8sServiceAccount?: k8s.core.v1.ServiceAccount } = {},
): void {
const propsHash = debeziumPropsString.apply((props) =>
createHash('md5').update(props).digest('hex'),
Expand Down Expand Up @@ -225,15 +293,37 @@ export function deployDebeziumKubernetesResources(
claimName: `${name}-debezium-pvc`,
},
});
volumeMounts.push({ name: 'storage', mountPath: '/debezium/data' });
volumeMounts.push({ name: 'storage', mountPath: '/pvc/data' });
initContainers.push({
name: 'data-ownership',
name: 'copy-offsets',
image: 'alpine:3',
command: ['chmod', '777', '/debezium/data'],
volumeMounts: [{ name: 'storage', mountPath: '/debezium/data' }],
command: [
'sh',
'-c',
'[ -f /pvc/data/offsets.dat ] && mv /pvc/data/offsets.dat /debezium/data/offsets.dat || true',
],
volumeMounts: [
{ name: 'storage', mountPath: '/pvc/data' },
{ name: 'gcs-fuse-csi-ephemeral', mountPath: '/debezium/data' },
],
});
}

volumes.push({
name: 'gcs-fuse-csi-ephemeral',
csi: {
driver: 'gcsfuse.csi.storage.gke.io',
volumeAttributes: {
bucketName: `${name}-debezium-storage`,
mountOptions: 'implicit-dirs,uid=185,gid=0',
},
},
});
volumeMounts.push({
name: 'gcs-fuse-csi-ephemeral',
mountPath: '/debezium/data',
});

let livenessProbe: k8s.types.input.core.v1.Probe | undefined;
if (!disableHealthCheck) {
livenessProbe = {
Expand Down Expand Up @@ -261,6 +351,12 @@ export function deployDebeziumKubernetesResources(
template: {
metadata: {
labels: { ...labels, props: propsHash },
annotations: {
'gke-gcsfuse/volumes': 'true',
'gke-gcsfuse/memory-limit': '128Mi',
'gke-gcsfuse/cpu-request': '50m',
'gke-gcsfuse/memory-request': '32Mi',
},
},
spec: {
nodeSelector: disk
Expand All @@ -270,6 +366,9 @@ export function deployDebeziumKubernetesResources(
initContainers,
affinity: !isAdhocEnv ? affinity : undefined,
tolerations,
serviceAccountName: k8sServiceAccount?.metadata.apply(
(metadata) => metadata.name,
),
containers: [
{
name: 'debezium',
Expand Down Expand Up @@ -299,46 +398,3 @@ export function deployDebeziumKubernetesResources(
{ provider },
);
}

export function deployDebeziumWithDependencies(
name: string,
namespace: string | Input<string>,
debeziumPropsString: Output<string>,
diskZone: Input<string>,
{
diskType,
diskSize,
limits = {
cpu: '1',
memory: '1024Mi',
},
env = [],
image = 'debezium/server:1.6',
resourcePrefix = '',
provider,
isAdhocEnv,
disableHealthCheck,
}: OptionalArgs = {},
): void {
const { debeziumKey, disk } = deployDebeziumSharedDependencies(
name,
diskZone,
{ diskType, diskSize, resourcePrefix, isAdhocEnv },
);
deployDebeziumKubernetesResources(
name,
namespace,
debeziumPropsString,
debeziumKey,
disk,
{
limits,
env,
image,
resourcePrefix,
provider,
isAdhocEnv,
disableHealthCheck,
},
);
}
13 changes: 12 additions & 1 deletion src/k8s.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
import * as gcp from '@pulumi/gcp';
import { autoscaling } from '@pulumi/kubernetes/types/input';
import { Resource } from '@pulumi/pulumi/resource';
import { camelToUnderscore } from './utils';
import { camelToUnderscore, gcpProjectNumber } from './utils';
import { NodeLabels } from './kubernetes';
import { defaultSpotWeight } from './constants';

Expand All @@ -28,6 +28,14 @@ export function k8sServiceAccountToIdentity(
);
}

export const k8sServiceAccountToWorkloadPrincipal = (
serviceAccount: k8s.core.v1.ServiceAccount,
): Output<string> =>
serviceAccount.metadata.apply(
(metadata) =>
`principal://iam.googleapis.com/projects/${gcpProjectNumber()}/locations/global/workloadIdentityPools/${gcp.config.project}.svc.id.goog/subject/ns/${metadata.namespace}/sa/${metadata.name}`,
);

export function createK8sServiceAccountFromGCPServiceAccount(
resourceName: string,
name: string,
Expand Down Expand Up @@ -171,6 +179,9 @@ export const getMemoryAndCpuMetrics = (
},
];

/**
* @deprecated Use createAndBindK8sServiceAccount instead
*/
export const bindK8sServiceAccountToGCP = (
resourcePrefix: string,
name: string,
Expand Down
13 changes: 12 additions & 1 deletion src/suite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
import {
deployDebeziumKubernetesResources,
deployDebeziumSharedDependencies,
deployDebeziumSharedDependenciesV2,
} from '../debezium';
import { location } from '../config';
import { stripCpuFromLimits } from '../utils';
Expand Down Expand Up @@ -73,7 +74,7 @@ export function customMetricToK8s(
}
}

function createAndBindK8sServiceAccount(
export function createAndBindK8sServiceAccount(
resourcePrefix: string | undefined,
name: string,
namespace: string,
Expand Down Expand Up @@ -458,6 +459,15 @@ export function deployApplicationSuiteToProvider({
isAdhocEnv,
},
);
const k8sServiceAccount = deployDebeziumSharedDependenciesV2(
{
name,
namespace,
resourcePrefix,
isAdhocEnv,
},
provider,
);
// Useful if we want to migrate Debezium without affecting its dependencies
if (!debezium.dependenciesOnly) {
const debeziumDefault = isAdhocEnv ? '2.0' : '1.6';
Expand All @@ -476,6 +486,7 @@ export function deployApplicationSuiteToProvider({
env: debezium.env,
disableHealthCheck: debezium.disableHealthCheck,
affinity: debezium.affinity,
k8sServiceAccount,
},
);
}
Expand Down
7 changes: 6 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { all, Input, Output } from '@pulumi/pulumi';
import { all, Config, Input, Output } from '@pulumi/pulumi';
import { PodResources } from './k8s';

export type AdhocEnv = { isAdhocEnv: boolean };
Expand All @@ -22,3 +22,8 @@ export function stripCpuFromLimits(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
return all([requests]).apply(([{ cpu, ...rest }]) => rest);
}

export const gcpProjectNumber = () => {
const __config = new Config();
return __config.require('projectNumber');
};

0 comments on commit 70c867b

Please sign in to comment.