From 410cb13fb6a7fe94a7e4c17dfdad06c4f972c8ee Mon Sep 17 00:00:00 2001 From: lmouhib Date: Fri, 2 Aug 2024 16:16:28 +0100 Subject: [PATCH 01/14] change kafka-api behavior --- framework/src/streaming/lib/msk/kafka-api-props.ts | 7 +++++++ framework/src/streaming/lib/msk/kafka-api.ts | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/framework/src/streaming/lib/msk/kafka-api-props.ts b/framework/src/streaming/lib/msk/kafka-api-props.ts index 7a6dacafa..5718c4933 100644 --- a/framework/src/streaming/lib/msk/kafka-api-props.ts +++ b/framework/src/streaming/lib/msk/kafka-api-props.ts @@ -88,4 +88,11 @@ export interface KafkaApiProps { * @default WARN */ readonly kafkaClientLogLevel?: KafkaClientLogLevel; + + /** + * If you there is an already existing service token deploy for the custom resource + * You can reuse it to reduce the number of resource created + * @default WARN + */ + readonly serviceToken?: string; } \ No newline at end of file diff --git a/framework/src/streaming/lib/msk/kafka-api.ts b/framework/src/streaming/lib/msk/kafka-api.ts index f6c4d61a1..f5a10959d 100644 --- a/framework/src/streaming/lib/msk/kafka-api.ts +++ b/framework/src/streaming/lib/msk/kafka-api.ts @@ -94,6 +94,14 @@ export class KafkaApi extends TrackedConstruct { throw Error ('VPC requires the following attributes: "vpcId", "availabilityZones", "privateSubnets" '); } + if(props.clientAuthentication.saslProps?.iam == true && props.serviceToken) { + this.mskIamServiceToken = props.serviceToken; + } + + if(props.clientAuthentication.tlsProps?.certificateAuthorities && props.serviceToken) { + this.mskAclServiceToken = props.serviceToken; + } + if (props.clientAuthentication.tlsProps?.certificateAuthorities) { const mskAclProvider = mskAclAdminProviderSetup( From d29a6b03e04aab73e6f28c9795dd834fb9c86c91 Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Mon, 9 Sep 2024 22:21:21 +0200 Subject: [PATCH 02/14] Removing creation of provider in case the service token is provided --- framework/API.md | 14 ++++ .../src/streaming/lib/msk/kafka-api-props.ts | 4 +- framework/src/streaming/lib/msk/kafka-api.ts | 76 ++++++++++--------- 3 files changed, 56 insertions(+), 38 deletions(-) diff --git a/framework/API.md b/framework/API.md index 15ce5b6b3..272a4303b 100644 --- a/framework/API.md +++ b/framework/API.md @@ -13388,6 +13388,7 @@ const kafkaApiProps: streaming.KafkaApiProps = { ... } | kafkaClientLogLevel | @cdklabs/aws-data-solutions-framework.streaming.KafkaClientLogLevel | The log level for the lambda that support the Custom Resource for both Managing ACLs and Topics. | | mtlsHandlerRole | aws-cdk-lib.aws_iam.IRole | The IAM role to pass to mTLS lambda handler This role must be able to be assumed with `lambda.amazonaws.com` service principal. | | removalPolicy | aws-cdk-lib.RemovalPolicy | The removal policy when deleting the CDK resource. | +| serviceToken | string | If you there is an already existing service token deploy for the custom resource You can reuse it to reduce the number of resource created. | | subnets | aws-cdk-lib.aws_ec2.SubnetSelection | The subnets where the Custom Resource Lambda Function would be created in. | --- @@ -13531,6 +13532,19 @@ Otherwise the removalPolicy is reverted to RETAIN. --- +##### `serviceToken`Optional + +```typescript +public readonly serviceToken: string; +``` + +- *Type:* string +- *Default:* WARN + +If you there is an already existing service token deploy for the custom resource You can reuse it to reduce the number of resource created. + +--- + ##### `subnets`Optional ```typescript diff --git a/framework/src/streaming/lib/msk/kafka-api-props.ts b/framework/src/streaming/lib/msk/kafka-api-props.ts index 5718c4933..7c0d838f9 100644 --- a/framework/src/streaming/lib/msk/kafka-api-props.ts +++ b/framework/src/streaming/lib/msk/kafka-api-props.ts @@ -90,8 +90,8 @@ export interface KafkaApiProps { readonly kafkaClientLogLevel?: KafkaClientLogLevel; /** - * If you there is an already existing service token deploy for the custom resource - * You can reuse it to reduce the number of resource created + * If you there is an already existing service token deployed for the custom resource + * you can reuse it to reduce the number of resource created * @default WARN */ readonly serviceToken?: string; diff --git a/framework/src/streaming/lib/msk/kafka-api.ts b/framework/src/streaming/lib/msk/kafka-api.ts index f5a10959d..bdb0c24b1 100644 --- a/framework/src/streaming/lib/msk/kafka-api.ts +++ b/framework/src/streaming/lib/msk/kafka-api.ts @@ -94,51 +94,55 @@ export class KafkaApi extends TrackedConstruct { throw Error ('VPC requires the following attributes: "vpcId", "availabilityZones", "privateSubnets" '); } - if(props.clientAuthentication.saslProps?.iam == true && props.serviceToken) { + if (props.clientAuthentication.saslProps?.iam == true && props.serviceToken) { this.mskIamServiceToken = props.serviceToken; } - if(props.clientAuthentication.tlsProps?.certificateAuthorities && props.serviceToken) { - this.mskAclServiceToken = props.serviceToken; - } - if (props.clientAuthentication.tlsProps?.certificateAuthorities) { - const mskAclProvider = mskAclAdminProviderSetup( - this, - this.removalPolicy, - props.vpc, - props.subnets || props.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), - props.brokerSecurityGroup, - props.clusterArn, - props.certficateSecret!, - props.mtlsHandlerRole, - ); - - this.mskAclServiceToken = mskAclProvider.serviceToken; - this.mskAclRole = mskAclProvider.onEventHandlerRole; - this.mskAclLogGroup = mskAclProvider.onEventHandlerLogGroup; - this.mskAclFunction = mskAclProvider.onEventHandlerFunction; - this.mskAclSecurityGroup = mskAclProvider.securityGroups; + if (props.serviceToken) { + this.mskAclServiceToken = props.serviceToken; + } else { + const mskAclProvider = mskAclAdminProviderSetup( + this, + this.removalPolicy, + props.vpc, + props.subnets || props.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), + props.brokerSecurityGroup, + props.clusterArn, + props.certficateSecret!, + props.mtlsHandlerRole, + ); + + this.mskAclServiceToken = mskAclProvider.serviceToken; + this.mskAclRole = mskAclProvider.onEventHandlerRole; + this.mskAclLogGroup = mskAclProvider.onEventHandlerLogGroup; + this.mskAclFunction = mskAclProvider.onEventHandlerFunction; + this.mskAclSecurityGroup = mskAclProvider.securityGroups; + } } if ( props.clientAuthentication.saslProps?.iam) { - const mskIamProvider = mskIamCrudProviderSetup( - this, - this.removalPolicy, - props.vpc, - props.subnets || props.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), - props.brokerSecurityGroup, - props.clusterArn, - props.iamHandlerRole, - ); - - this.mskIamServiceToken = mskIamProvider.serviceToken; - this.mskIamRole = mskIamProvider.onEventHandlerRole; - this.mskIamLogGroup = mskIamProvider.onEventHandlerLogGroup; - this.mskIamFunction = mskIamProvider.onEventHandlerFunction; - this.mskIamSecurityGroup = mskIamProvider.securityGroups; + if (props.serviceToken) { + this.mskAclServiceToken = props.serviceToken; + } else { + const mskIamProvider = mskIamCrudProviderSetup( + this, + this.removalPolicy, + props.vpc, + props.subnets || props.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), + props.brokerSecurityGroup, + props.clusterArn, + props.iamHandlerRole, + ); + + this.mskIamServiceToken = mskIamProvider.serviceToken; + this.mskIamRole = mskIamProvider.onEventHandlerRole; + this.mskIamLogGroup = mskIamProvider.onEventHandlerLogGroup; + this.mskIamFunction = mskIamProvider.onEventHandlerFunction; + this.mskIamSecurityGroup = mskIamProvider.securityGroups; + } } } From 99a167933fa36f6c89a43507a910ca3ac41ee1cb Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Tue, 10 Sep 2024 12:17:49 +0200 Subject: [PATCH 03/14] Fixed typo in doc and added test --- framework/API.md | 4 +- .../src/streaming/lib/msk/kafka-api-props.ts | 2 +- .../test/unit/streaming/kafka-api.test.ts | 62 ++++++++++++++++++- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/framework/API.md b/framework/API.md index 272a4303b..e8e560d47 100644 --- a/framework/API.md +++ b/framework/API.md @@ -13388,7 +13388,7 @@ const kafkaApiProps: streaming.KafkaApiProps = { ... } | kafkaClientLogLevel | @cdklabs/aws-data-solutions-framework.streaming.KafkaClientLogLevel | The log level for the lambda that support the Custom Resource for both Managing ACLs and Topics. | | mtlsHandlerRole | aws-cdk-lib.aws_iam.IRole | The IAM role to pass to mTLS lambda handler This role must be able to be assumed with `lambda.amazonaws.com` service principal. | | removalPolicy | aws-cdk-lib.RemovalPolicy | The removal policy when deleting the CDK resource. | -| serviceToken | string | If you there is an already existing service token deploy for the custom resource You can reuse it to reduce the number of resource created. | +| serviceToken | string | If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. | | subnets | aws-cdk-lib.aws_ec2.SubnetSelection | The subnets where the Custom Resource Lambda Function would be created in. | --- @@ -13541,7 +13541,7 @@ public readonly serviceToken: string; - *Type:* string - *Default:* WARN -If you there is an already existing service token deploy for the custom resource You can reuse it to reduce the number of resource created. +If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. --- diff --git a/framework/src/streaming/lib/msk/kafka-api-props.ts b/framework/src/streaming/lib/msk/kafka-api-props.ts index 7c0d838f9..1375e17ee 100644 --- a/framework/src/streaming/lib/msk/kafka-api-props.ts +++ b/framework/src/streaming/lib/msk/kafka-api-props.ts @@ -90,7 +90,7 @@ export interface KafkaApiProps { readonly kafkaClientLogLevel?: KafkaClientLogLevel; /** - * If you there is an already existing service token deployed for the custom resource + * If there is an already existing service token deployed for the custom resource * you can reuse it to reduce the number of resource created * @default WARN */ diff --git a/framework/test/unit/streaming/kafka-api.test.ts b/framework/test/unit/streaming/kafka-api.test.ts index 912a9ebd0..aa17016d6 100644 --- a/framework/test/unit/streaming/kafka-api.test.ts +++ b/framework/test/unit/streaming/kafka-api.test.ts @@ -15,7 +15,7 @@ import { SecurityGroup, Vpc, Subnet } from 'aws-cdk-lib/aws-ec2'; import { Role } from 'aws-cdk-lib/aws-iam'; import { CfnCluster } from 'aws-cdk-lib/aws-msk'; import { Secret } from 'aws-cdk-lib/aws-secretsmanager'; -import { Authentication, ClientAuthentication, KafkaApi, MskClusterType } from '../../../src/streaming'; +import { Authentication, ClientAuthentication, KafkaApi, MskClusterType, KafkaClientLogLevel } from '../../../src/streaming'; describe('Using default KafkaApi configuration with MSK provisioned and IAM and mTLS authentication should ', () => { @@ -1250,4 +1250,64 @@ describe('Using global removal policy and DELETE construct removal policy ', () DeletionPolicy: 'Delete', }); }); +}); + +describe('Using default KafkaApi configuration with MSK provisioned and IAM and mTLS authentication should ', () => { + + const app = new App(); + const stack = new Stack(app, 'Stack'); + + const brokerSecurityGroup = SecurityGroup.fromSecurityGroupId(stack, 'sg', 'sg-1234'); + const vpc = Vpc.fromVpcAttributes(stack, 'vpc', { + vpcId: 'XXXXXXXX', + availabilityZones: ['us-east-1a'], + vpcCidrBlock: '10.0.0.0/16', + privateSubnetIds: ['XXXXXXXX'], + }); + + const cluster = new CfnCluster(stack, 'MyCluster', { + clientAuthentication: { + sasl: { + iam: { + enabled: true, + }, + }, + }, + brokerNodeGroupInfo: { + clientSubnets: vpc.privateSubnets.map(s => s.subnetId), + instanceType: 'kafka.m5large', + securityGroups: [brokerSecurityGroup.securityGroupId], + }, + clusterName: 'XXXXXX', + kafkaVersion: '3.5.1', + numberOfBrokerNodes: 3, + }); + + const kafkaApi = new KafkaApi(stack, 'KafkaApi', { + clusterArn: cluster.attrArn, + clusterType: MskClusterType.PROVISIONED, + brokerSecurityGroup, + vpc, + clientAuthentication: ClientAuthentication.sasl({ + iam: true, + }), + kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, + serviceToken: 'arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX', + }); + + kafkaApi.setTopic('topic1', + Authentication.IAM, + { + topic: 'topic1', + numPartitions: 1, + }, + ); + + const template = Template.fromStack(stack, {}); + + test('should have a service token to reuse', () => { + template.hasResourceProperties('Custom::MskTopic', { + ServiceToken: 'arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX', + }); + }); }); \ No newline at end of file From 8e434967c87b2e878a333e17a0448beba97bef99 Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Tue, 10 Sep 2024 14:52:39 +0200 Subject: [PATCH 04/14] Adding service token to the msk serverless and msk provisioned. Exposed service token in msk serverless - provisioned - api. --- framework/src/streaming/README.md | 2 ++ .../src/streaming/examples/kafka-api-default.lit.ts | 1 + framework/src/streaming/lib/msk/kafka-api-props.ts | 1 - framework/src/streaming/lib/msk/kafka-api.ts | 5 +++++ .../src/streaming/lib/msk/msk-provisioned-props.ts | 6 ++++++ framework/src/streaming/lib/msk/msk-provisioned.ts | 10 +++++++++- .../src/streaming/lib/msk/msk-serverless-props.ts | 6 ++++++ framework/src/streaming/lib/msk/msk-serverless.ts | 7 +++++++ framework/test/e2e/kafka-api.e2e.test.ts | 4 ++++ 9 files changed, 40 insertions(+), 2 deletions(-) diff --git a/framework/src/streaming/README.md b/framework/src/streaming/README.md index 1ee088be0..aa55d2f89 100644 --- a/framework/src/streaming/README.md +++ b/framework/src/streaming/README.md @@ -202,6 +202,8 @@ The construct leverages the [CDK Provider Framework](https://docs.aws.amazon.com [example kafka api](./examples/kafka-api-default.lit.ts) +When deploying multiple stacks with the Kafka Api, if there is an already existing service token deployed for the custom resource, you can reuse it to reduce the number of resource created, therefore the number of IP assigned to the custom resources. + :::warning The construct needs to be deployed in the same region as the MSK cluster. diff --git a/framework/src/streaming/examples/kafka-api-default.lit.ts b/framework/src/streaming/examples/kafka-api-default.lit.ts index ce7e8a837..0005726c7 100644 --- a/framework/src/streaming/examples/kafka-api-default.lit.ts +++ b/framework/src/streaming/examples/kafka-api-default.lit.ts @@ -43,6 +43,7 @@ const kafkaApi = new KafkaApi(stack, 'kafkaApi', { certificateAuthorities: [certificateAuthority], },), kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, + serviceToken: 'arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX', }); /// !hide diff --git a/framework/src/streaming/lib/msk/kafka-api-props.ts b/framework/src/streaming/lib/msk/kafka-api-props.ts index 1375e17ee..1797e5719 100644 --- a/framework/src/streaming/lib/msk/kafka-api-props.ts +++ b/framework/src/streaming/lib/msk/kafka-api-props.ts @@ -92,7 +92,6 @@ export interface KafkaApiProps { /** * If there is an already existing service token deployed for the custom resource * you can reuse it to reduce the number of resource created - * @default WARN */ readonly serviceToken?: string; } \ No newline at end of file diff --git a/framework/src/streaming/lib/msk/kafka-api.ts b/framework/src/streaming/lib/msk/kafka-api.ts index bdb0c24b1..6b2f6e952 100644 --- a/framework/src/streaming/lib/msk/kafka-api.ts +++ b/framework/src/streaming/lib/msk/kafka-api.ts @@ -59,6 +59,11 @@ export class KafkaApi extends TrackedConstruct { * The Security Group used by the Custom Resource provider when MSK is using mTLS authentication */ public readonly mskAclSecurityGroup?: ISecurityGroup []; + /** + * If there is an already existing service token deployed for the custom resource + * you can reuse it to reduce the number of resource created + */ + public readonly serviceToken?: string; private readonly mskAclServiceToken?: string; private readonly mskIamServiceToken?: string; diff --git a/framework/src/streaming/lib/msk/msk-provisioned-props.ts b/framework/src/streaming/lib/msk/msk-provisioned-props.ts index a35e3499e..e96d681f3 100644 --- a/framework/src/streaming/lib/msk/msk-provisioned-props.ts +++ b/framework/src/streaming/lib/msk/msk-provisioned-props.ts @@ -154,6 +154,12 @@ export interface MskProvisionedProps { * `aws kafka describe-cluster --cluster-arn YOUR_CLUSTER_ARN` */ readonly currentVersion?: string; + + /** + * If there is an already existing service token deployed for the custom resource + * you can reuse it to reduce the number of resource created + */ + readonly serviceToken?: string; } diff --git a/framework/src/streaming/lib/msk/msk-provisioned.ts b/framework/src/streaming/lib/msk/msk-provisioned.ts index f6c39572a..b6c84d39d 100644 --- a/framework/src/streaming/lib/msk/msk-provisioned.ts +++ b/framework/src/streaming/lib/msk/msk-provisioned.ts @@ -174,6 +174,11 @@ export class MskProvisioned extends TrackedConstruct { * The Security Group used by the Lambda responsible for CRUD operations via mTLS authentication */ public readonly inClusterAclSecurityGroup?: ISecurityGroup[]; + /** + * If there is an already existing service token deployed for the custom resource + * you can reuse it to reduce the number of resource created + */ + public readonly serviceToken?: string; private readonly removalPolicy: RemovalPolicy; private readonly region: string; @@ -472,8 +477,11 @@ export class MskProvisioned extends TrackedConstruct { kafkaClientLogLevel: props?.kafkaClientLogLevel, clusterType: MskClusterType.PROVISIONED, removalPolicy: this.removalPolicy, + serviceToken: props?.serviceToken, }); + this.serviceToken = this.kafkaApi.serviceToken; + // Create the configuration let clusterConfigurationInfo: ClusterConfigurationInfo; @@ -532,7 +540,7 @@ export class MskProvisioned extends TrackedConstruct { this.updateConnectivitySecurityGroup = updateConnectivityProvider.securityGroups; // Set the CR resource that are used by IAM credentials auth CR - // Applly the cluster configuration if provided and the cluster is created without mTLS auth + // Apply the cluster configuration if provided and the cluster is created without mTLS auth if (this.iamAcl) { this.iamCrudAdminRole = this.kafkaApi.mskAclRole; diff --git a/framework/src/streaming/lib/msk/msk-serverless-props.ts b/framework/src/streaming/lib/msk/msk-serverless-props.ts index c75db5ff4..9180345ab 100644 --- a/framework/src/streaming/lib/msk/msk-serverless-props.ts +++ b/framework/src/streaming/lib/msk/msk-serverless-props.ts @@ -51,4 +51,10 @@ export interface MskServerlessProps { * @default - The resources are not deleted (`RemovalPolicy.RETAIN`). */ readonly removalPolicy?: RemovalPolicy; + + /** + * If there is an already existing service token deployed for the custom resource + * you can reuse it to reduce the number of resource created + */ + readonly serviceToken?: string; } diff --git a/framework/src/streaming/lib/msk/msk-serverless.ts b/framework/src/streaming/lib/msk/msk-serverless.ts index d3f649675..a82576a88 100644 --- a/framework/src/streaming/lib/msk/msk-serverless.ts +++ b/framework/src/streaming/lib/msk/msk-serverless.ts @@ -28,6 +28,11 @@ export class MskServerless extends TrackedConstruct { public readonly brokerSecurityGroup?: ISecurityGroup; public readonly clusterName: string; public readonly lambdaSecurityGroup: ISecurityGroup; + /** + * If there is an already existing service token deployed for the custom resource + * you can reuse it to reduce the number of resource created + */ + public readonly serviceToken?: string; private readonly removalPolicy: RemovalPolicy; private readonly kafkaApi: KafkaApi; @@ -110,8 +115,10 @@ export class MskServerless extends TrackedConstruct { clientAuthentication: ClientAuthentication.sasl( { iam: true }), clusterType: MskClusterType.SERVERLESS, kafkaClientLogLevel: props?.kafkaClientLogLevel ?? KafkaClientLogLevel.WARN, + serviceToken: props?.serviceToken, }); + this.serviceToken = this.kafkaApi.serviceToken; } /** diff --git a/framework/test/e2e/kafka-api.e2e.test.ts b/framework/test/e2e/kafka-api.e2e.test.ts index 149086287..355fe7954 100644 --- a/framework/test/e2e/kafka-api.e2e.test.ts +++ b/framework/test/e2e/kafka-api.e2e.test.ts @@ -112,6 +112,10 @@ new CfnOutput(stack, 'clusterArn', { value: cfnCluster.attrArn, }); +new CfnOutput(stack, 'serviceToken', { + value: mskApi.serviceToken!, +}); + let deployResult: Record; beforeAll(async() => { From 1b2af3b3c47c46562626f60ec6cec296c230f2e0 Mon Sep 17 00:00:00 2001 From: lmouhib Date: Tue, 10 Sep 2024 14:10:17 +0100 Subject: [PATCH 05/14] Rework the service token exposure --- framework/API.md | 40 ++++++++- .../lib/msk/msk-provisioned-props.ts | 6 -- .../src/streaming/lib/msk/msk-provisioned.ts | 8 +- .../streaming/lib/msk/msk-serverless-props.ts | 6 -- .../src/streaming/lib/msk/msk-serverless.ts | 8 +- .../kafka-api-byo-service-token.e2e.test.ts | 81 +++++++++++++++++++ .../generated/_streaming-kafka-api.mdx | 6 +- 7 files changed, 137 insertions(+), 18 deletions(-) create mode 100644 framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts diff --git a/framework/API.md b/framework/API.md index e8e560d47..5ebfbcffc 100644 --- a/framework/API.md +++ b/framework/API.md @@ -4547,6 +4547,7 @@ Any object. | mskIamLogGroup | aws-cdk-lib.aws_logs.ILogGroup | The Cloudwatch Log Group used by the Custom Resource provider when MSK is using IAM authentication. | | mskIamRole | aws-cdk-lib.aws_iam.IRole | The IAM Role used by the Custom Resource provider when MSK is using IAM authentication. | | mskIamSecurityGroup | aws-cdk-lib.aws_ec2.ISecurityGroup[] | The Security Group used by the Custom Resource provider when MSK is using IAM authentication. | +| serviceToken | string | If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. | --- @@ -4658,6 +4659,18 @@ The Security Group used by the Custom Resource provider when MSK is using IAM au --- +##### `serviceToken`Optional + +```typescript +public readonly serviceToken: string; +``` + +- *Type:* string + +If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. + +--- + #### Constants | **Name** | **Type** | **Description** | @@ -5179,6 +5192,7 @@ streaming.MskProvisioned.createClusterConfiguration(scope: Construct, id: string | inClusterAclLogGroup | aws-cdk-lib.aws_logs.ILogGroup | The CloudWatch Log Group used by the Lambda responsible for CRUD operations via mTLS authentication. | | inClusterAclRole | aws-cdk-lib.aws_iam.IRole | The IAM role used by the Lambda responsible for CRUD operations via mTLS authentication. | | inClusterAclSecurityGroup | aws-cdk-lib.aws_ec2.ISecurityGroup[] | The Security Group used by the Lambda responsible for CRUD operations via mTLS authentication. | +| serviceToken | string | If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. | | updateConnectivityFunction | aws-cdk-lib.aws_lambda.IFunction | The Lambda function responsible for updating MSK Connectivity. | | updateConnectivityLogGroup | aws-cdk-lib.aws_logs.ILogGroup | The CloudWatch Log Group used by the Lambda responsible for updating MSK Connectivity. | | updateConnectivityRole | aws-cdk-lib.aws_iam.IRole | The IAM Role used by the Lambda responsible for updating MSK Connectivity. | @@ -5418,6 +5432,18 @@ The Security Group used by the Lambda responsible for CRUD operations via mTLS a --- +##### `serviceToken`Optional + +```typescript +public readonly serviceToken: string; +``` + +- *Type:* string + +If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. + +--- + ##### `updateConnectivityFunction`Optional ```typescript @@ -5804,6 +5830,7 @@ Any object. | lambdaSecurityGroup | aws-cdk-lib.aws_ec2.ISecurityGroup | *No description.* | | vpc | aws-cdk-lib.aws_ec2.IVpc | *No description.* | | brokerSecurityGroup | aws-cdk-lib.aws_ec2.ISecurityGroup | *No description.* | +| serviceToken | string | If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. | --- @@ -5869,6 +5896,18 @@ public readonly brokerSecurityGroup: ISecurityGroup; --- +##### `serviceToken`Optional + +```typescript +public readonly serviceToken: string; +``` + +- *Type:* string + +If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. + +--- + #### Constants | **Name** | **Type** | **Description** | @@ -13539,7 +13578,6 @@ public readonly serviceToken: string; ``` - *Type:* string -- *Default:* WARN If there is an already existing service token deployed for the custom resource you can reuse it to reduce the number of resource created. diff --git a/framework/src/streaming/lib/msk/msk-provisioned-props.ts b/framework/src/streaming/lib/msk/msk-provisioned-props.ts index e96d681f3..a35e3499e 100644 --- a/framework/src/streaming/lib/msk/msk-provisioned-props.ts +++ b/framework/src/streaming/lib/msk/msk-provisioned-props.ts @@ -154,12 +154,6 @@ export interface MskProvisionedProps { * `aws kafka describe-cluster --cluster-arn YOUR_CLUSTER_ARN` */ readonly currentVersion?: string; - - /** - * If there is an already existing service token deployed for the custom resource - * you can reuse it to reduce the number of resource created - */ - readonly serviceToken?: string; } diff --git a/framework/src/streaming/lib/msk/msk-provisioned.ts b/framework/src/streaming/lib/msk/msk-provisioned.ts index b6c84d39d..588e515cc 100644 --- a/framework/src/streaming/lib/msk/msk-provisioned.ts +++ b/framework/src/streaming/lib/msk/msk-provisioned.ts @@ -5,7 +5,7 @@ import { readFileSync } from 'fs'; import { join } from 'path'; -import { CustomResource, Duration, RemovalPolicy, Stack } from 'aws-cdk-lib'; +import { CfnOutput, CustomResource, Duration, RemovalPolicy, Stack } from 'aws-cdk-lib'; import { Connections, ISecurityGroup, IVpc, SecurityGroup, SubnetType } from 'aws-cdk-lib/aws-ec2'; import { IPrincipal, IRole, ManagedPolicy, PolicyDocument, PolicyStatement, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'; import { IKey, Key } from 'aws-cdk-lib/aws-kms'; @@ -477,7 +477,6 @@ export class MskProvisioned extends TrackedConstruct { kafkaClientLogLevel: props?.kafkaClientLogLevel, clusterType: MskClusterType.PROVISIONED, removalPolicy: this.removalPolicy, - serviceToken: props?.serviceToken, }); this.serviceToken = this.kafkaApi.serviceToken; @@ -642,6 +641,11 @@ export class MskProvisioned extends TrackedConstruct { } } + + new CfnOutput(this, 'ServiceToken', { + value: this.kafkaApi.serviceToken!, + exportName: 'ServiceToken', + }); } diff --git a/framework/src/streaming/lib/msk/msk-serverless-props.ts b/framework/src/streaming/lib/msk/msk-serverless-props.ts index 9180345ab..c75db5ff4 100644 --- a/framework/src/streaming/lib/msk/msk-serverless-props.ts +++ b/framework/src/streaming/lib/msk/msk-serverless-props.ts @@ -51,10 +51,4 @@ export interface MskServerlessProps { * @default - The resources are not deleted (`RemovalPolicy.RETAIN`). */ readonly removalPolicy?: RemovalPolicy; - - /** - * If there is an already existing service token deployed for the custom resource - * you can reuse it to reduce the number of resource created - */ - readonly serviceToken?: string; } diff --git a/framework/src/streaming/lib/msk/msk-serverless.ts b/framework/src/streaming/lib/msk/msk-serverless.ts index a82576a88..52515a1be 100644 --- a/framework/src/streaming/lib/msk/msk-serverless.ts +++ b/framework/src/streaming/lib/msk/msk-serverless.ts @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { CustomResource, RemovalPolicy } from 'aws-cdk-lib'; +import { CfnOutput, CustomResource, RemovalPolicy } from 'aws-cdk-lib'; import { ISecurityGroup, IVpc, SecurityGroup, SubnetSelection } from 'aws-cdk-lib/aws-ec2'; import { IPrincipal, PolicyDocument } from 'aws-cdk-lib/aws-iam'; import { CfnClusterPolicy, CfnServerlessCluster } from 'aws-cdk-lib/aws-msk'; @@ -115,10 +115,14 @@ export class MskServerless extends TrackedConstruct { clientAuthentication: ClientAuthentication.sasl( { iam: true }), clusterType: MskClusterType.SERVERLESS, kafkaClientLogLevel: props?.kafkaClientLogLevel ?? KafkaClientLogLevel.WARN, - serviceToken: props?.serviceToken, }); this.serviceToken = this.kafkaApi.serviceToken; + + new CfnOutput(this, 'ServiceToken', { + value: this.kafkaApi.serviceToken!, + exportName: 'ServiceToken', + }); } /** diff --git a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts new file mode 100644 index 000000000..a613bebae --- /dev/null +++ b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts @@ -0,0 +1,81 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +/** + * E2E test for the KafkaApi construct + * + * @group e2e/streaming/kafka-api-byo-service-token + */ + + +import { App, RemovalPolicy, CfnOutput } from 'aws-cdk-lib'; + +import { CertificateAuthority } from 'aws-cdk-lib/aws-acmpca'; +import { SecurityGroup, SubnetType } from 'aws-cdk-lib/aws-ec2'; +import { CfnCluster } from 'aws-cdk-lib/aws-msk'; +import { Secret } from 'aws-cdk-lib/aws-secretsmanager'; +import { TestStack } from './test-stack'; +import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, ClientAuthentication, KafkaApi, KafkaClientLogLevel, MskClusterType, MskServerless, ResourcePatternTypes } from '../../src/streaming'; +import { DataVpc, Utils } from '../../src/utils'; + + +jest.setTimeout(10000000); + +// GIVEN +const app = new App(); +const testStack = new TestStack('KafkaAPiTestStack', app); +const { stack } = testStack; +stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true); + +let vpc = new DataVpc(stack, 'vpc', { + vpcCidr: '10.0.0.0/16', + removalPolicy: RemovalPolicy.DESTROY, +}); + +let securityGroup = SecurityGroup.fromSecurityGroupId(stack, 'securityGroup', vpc.vpc.vpcDefaultSecurityGroup); + +const msk = new MskServerless(stack, 'cluster', { + clusterName: `cluster-serverless${Utils.generateHash(stack.stackName).slice(0, 3)}`, + vpc: vpc.vpc, + subnets: vpc.vpc.selectSubnets(), + securityGroups: [securityGroup], + removalPolicy: RemovalPolicy.DESTROY, + kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, +}); + +const mskApi = new KafkaApi(stack, 'kafkaApi', { + vpc: vpc.vpc, + clusterArn: msk.cluster.attrArn, + subnets: vpc.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), + brokerSecurityGroup: securityGroup, + kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, + clusterType: MskClusterType.SERVERLESS, + removalPolicy: RemovalPolicy.DESTROY, + clientAuthentication: ClientAuthentication.sasl({ iam: true }), + serviceToken: msk.serviceToken, +}); + +msk.addTopic('topicServerelss', { + topic: 'dummy', + numPartitions: 1, +}, RemovalPolicy.DESTROY, false, 1500); + +new CfnOutput(stack, 'clusterArn', { + value: msk.cluster.attrArn, +}); + +let deployResult: Record; + +beforeAll(async () => { + // WHEN + deployResult = await testStack.deploy(); +}, 10000000); + +it('Containers runtime created successfully', async () => { + // THEN + expect(deployResult.clusterArn).toContain('arn'); +}); + +afterAll(async () => { + await testStack.destroy(); +}, 10000000); \ No newline at end of file diff --git a/website/docs/constructs/library/generated/_streaming-kafka-api.mdx b/website/docs/constructs/library/generated/_streaming-kafka-api.mdx index e155ca88c..5e0fd5856 100644 --- a/website/docs/constructs/library/generated/_streaming-kafka-api.mdx +++ b/website/docs/constructs/library/generated/_streaming-kafka-api.mdx @@ -43,6 +43,7 @@ const kafkaApi = new KafkaApi(stack, 'kafkaApi', { certificateAuthorities: [certificateAuthority], },), kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, + serviceToken: 'arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX', }); ``` @@ -72,13 +73,16 @@ kafka_api = KafkaApi(stack, "kafkaApi", iam=True, certificate_authorities=[certificate_authority] ), - kafka_client_log_level=KafkaClientLogLevel.DEBUG + kafka_client_log_level=KafkaClientLogLevel.DEBUG, + service_token="arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX" ) ``` +When deploying multiple stacks with the Kafka Api, if there is an already existing service token deployed for the custom resource, you can reuse it to reduce the number of resource created, therefore the number of IP assigned to the custom resources. + :::warning The construct needs to be deployed in the same region as the MSK cluster. From 4d1f3b32474bfdaafc619427e8c796c314f5570c Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Tue, 10 Sep 2024 16:46:01 +0200 Subject: [PATCH 06/14] Fixed test kafkaApi e2e test --- framework/src/streaming/lib/msk/kafka-api.ts | 2 ++ .../src/streaming/lib/msk/msk-provisioned.ts | 2 +- .../src/streaming/lib/msk/msk-serverless.ts | 2 +- .../kafka-api-byo-service-token.e2e.test.ts | 30 ++++++++++++------- framework/test/e2e/kafka-api.e2e.test.ts | 6 +--- 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/framework/src/streaming/lib/msk/kafka-api.ts b/framework/src/streaming/lib/msk/kafka-api.ts index 6b2f6e952..b641a7101 100644 --- a/framework/src/streaming/lib/msk/kafka-api.ts +++ b/framework/src/streaming/lib/msk/kafka-api.ts @@ -124,6 +124,7 @@ export class KafkaApi extends TrackedConstruct { this.mskAclLogGroup = mskAclProvider.onEventHandlerLogGroup; this.mskAclFunction = mskAclProvider.onEventHandlerFunction; this.mskAclSecurityGroup = mskAclProvider.securityGroups; + this.serviceToken = mskAclProvider.serviceToken; } } @@ -147,6 +148,7 @@ export class KafkaApi extends TrackedConstruct { this.mskIamLogGroup = mskIamProvider.onEventHandlerLogGroup; this.mskIamFunction = mskIamProvider.onEventHandlerFunction; this.mskIamSecurityGroup = mskIamProvider.securityGroups; + this.serviceToken = mskIamProvider.serviceToken; } } diff --git a/framework/src/streaming/lib/msk/msk-provisioned.ts b/framework/src/streaming/lib/msk/msk-provisioned.ts index 588e515cc..a56d2d094 100644 --- a/framework/src/streaming/lib/msk/msk-provisioned.ts +++ b/framework/src/streaming/lib/msk/msk-provisioned.ts @@ -643,7 +643,7 @@ export class MskProvisioned extends TrackedConstruct { } new CfnOutput(this, 'ServiceToken', { - value: this.kafkaApi.serviceToken!, + value: this.serviceToken!, exportName: 'ServiceToken', }); diff --git a/framework/src/streaming/lib/msk/msk-serverless.ts b/framework/src/streaming/lib/msk/msk-serverless.ts index 52515a1be..c2bb493b1 100644 --- a/framework/src/streaming/lib/msk/msk-serverless.ts +++ b/framework/src/streaming/lib/msk/msk-serverless.ts @@ -120,7 +120,7 @@ export class MskServerless extends TrackedConstruct { this.serviceToken = this.kafkaApi.serviceToken; new CfnOutput(this, 'ServiceToken', { - value: this.kafkaApi.serviceToken!, + value: this.serviceToken!, exportName: 'ServiceToken', }); } diff --git a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts index a613bebae..064e029a7 100644 --- a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts +++ b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts @@ -10,12 +10,9 @@ import { App, RemovalPolicy, CfnOutput } from 'aws-cdk-lib'; -import { CertificateAuthority } from 'aws-cdk-lib/aws-acmpca'; import { SecurityGroup, SubnetType } from 'aws-cdk-lib/aws-ec2'; -import { CfnCluster } from 'aws-cdk-lib/aws-msk'; -import { Secret } from 'aws-cdk-lib/aws-secretsmanager'; import { TestStack } from './test-stack'; -import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, ClientAuthentication, KafkaApi, KafkaClientLogLevel, MskClusterType, MskServerless, ResourcePatternTypes } from '../../src/streaming'; +import { ClientAuthentication, KafkaApi, KafkaClientLogLevel, MskClusterType, MskServerless, Authentication} from '../../src/streaming'; import { DataVpc, Utils } from '../../src/utils'; @@ -43,7 +40,7 @@ const msk = new MskServerless(stack, 'cluster', { kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, }); -const mskApi = new KafkaApi(stack, 'kafkaApi', { +const kafkaApi = new KafkaApi(stack, 'kafkaApi', { vpc: vpc.vpc, clusterArn: msk.cluster.attrArn, subnets: vpc.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), @@ -55,10 +52,16 @@ const mskApi = new KafkaApi(stack, 'kafkaApi', { serviceToken: msk.serviceToken, }); -msk.addTopic('topicServerelss', { - topic: 'dummy', - numPartitions: 1, -}, RemovalPolicy.DESTROY, false, 1500); + +kafkaApi.setTopic('dummyTopic', + Authentication.IAM, + { + topic: 'dummy', + numPartitions: 3, + }, + RemovalPolicy.DESTROY, + true, 1000 +); new CfnOutput(stack, 'clusterArn', { value: msk.cluster.attrArn, @@ -71,11 +74,18 @@ beforeAll(async () => { deployResult = await testStack.deploy(); }, 10000000); -it('Containers runtime created successfully', async () => { +test('MSK cluster created successfully', async () => { // THEN expect(deployResult.clusterArn).toContain('arn'); }); +/* +test('Kafka API outputs service token successfully', async () => { + // THEN + expect(deployResult.ServiceToken).toContain('arn'); +}); +*/ + afterAll(async () => { await testStack.destroy(); }, 10000000); \ No newline at end of file diff --git a/framework/test/e2e/kafka-api.e2e.test.ts b/framework/test/e2e/kafka-api.e2e.test.ts index 355fe7954..7085b716c 100644 --- a/framework/test/e2e/kafka-api.e2e.test.ts +++ b/framework/test/e2e/kafka-api.e2e.test.ts @@ -112,10 +112,6 @@ new CfnOutput(stack, 'clusterArn', { value: cfnCluster.attrArn, }); -new CfnOutput(stack, 'serviceToken', { - value: mskApi.serviceToken!, -}); - let deployResult: Record; beforeAll(async() => { @@ -123,7 +119,7 @@ beforeAll(async() => { deployResult = await testStack.deploy(); }, 10000000); -it('Containers runtime created successfully', async () => { +test('CMSK cluster created successfully', async () => { // THEN expect(deployResult.clusterArn).toContain('arn'); }); From 390fbe617e7b05b63b003491f64d8d5f737e8d29 Mon Sep 17 00:00:00 2001 From: lmouhib Date: Tue, 10 Sep 2024 16:29:10 +0100 Subject: [PATCH 07/14] lint --- .../src/streaming/lib/msk/msk-provisioned.ts | 4 +- .../src/streaming/lib/msk/msk-serverless.ts | 2 +- .../kafka-api-byo-service-token.e2e.test.ts | 62 +++++++++---------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/framework/src/streaming/lib/msk/msk-provisioned.ts b/framework/src/streaming/lib/msk/msk-provisioned.ts index a56d2d094..91ab9e417 100644 --- a/framework/src/streaming/lib/msk/msk-provisioned.ts +++ b/framework/src/streaming/lib/msk/msk-provisioned.ts @@ -641,8 +641,8 @@ export class MskProvisioned extends TrackedConstruct { } } - - new CfnOutput(this, 'ServiceToken', { + + new CfnOutput(this, 'ServiceToken', { value: this.serviceToken!, exportName: 'ServiceToken', }); diff --git a/framework/src/streaming/lib/msk/msk-serverless.ts b/framework/src/streaming/lib/msk/msk-serverless.ts index c2bb493b1..ad58633fb 100644 --- a/framework/src/streaming/lib/msk/msk-serverless.ts +++ b/framework/src/streaming/lib/msk/msk-serverless.ts @@ -119,7 +119,7 @@ export class MskServerless extends TrackedConstruct { this.serviceToken = this.kafkaApi.serviceToken; - new CfnOutput(this, 'ServiceToken', { + new CfnOutput(this, 'ServiceToken', { value: this.serviceToken!, exportName: 'ServiceToken', }); diff --git a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts index 064e029a7..d007d4a95 100644 --- a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts +++ b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts @@ -12,7 +12,7 @@ import { App, RemovalPolicy, CfnOutput } from 'aws-cdk-lib'; import { SecurityGroup, SubnetType } from 'aws-cdk-lib/aws-ec2'; import { TestStack } from './test-stack'; -import { ClientAuthentication, KafkaApi, KafkaClientLogLevel, MskClusterType, MskServerless, Authentication} from '../../src/streaming'; +import { ClientAuthentication, KafkaApi, KafkaClientLogLevel, MskClusterType, MskServerless, Authentication } from '../../src/streaming'; import { DataVpc, Utils } from '../../src/utils'; @@ -25,58 +25,58 @@ const { stack } = testStack; stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true); let vpc = new DataVpc(stack, 'vpc', { - vpcCidr: '10.0.0.0/16', - removalPolicy: RemovalPolicy.DESTROY, + vpcCidr: '10.0.0.0/16', + removalPolicy: RemovalPolicy.DESTROY, }); let securityGroup = SecurityGroup.fromSecurityGroupId(stack, 'securityGroup', vpc.vpc.vpcDefaultSecurityGroup); const msk = new MskServerless(stack, 'cluster', { - clusterName: `cluster-serverless${Utils.generateHash(stack.stackName).slice(0, 3)}`, - vpc: vpc.vpc, - subnets: vpc.vpc.selectSubnets(), - securityGroups: [securityGroup], - removalPolicy: RemovalPolicy.DESTROY, - kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, + clusterName: `cluster-serverless${Utils.generateHash(stack.stackName).slice(0, 3)}`, + vpc: vpc.vpc, + subnets: vpc.vpc.selectSubnets(), + securityGroups: [securityGroup], + removalPolicy: RemovalPolicy.DESTROY, + kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, }); const kafkaApi = new KafkaApi(stack, 'kafkaApi', { - vpc: vpc.vpc, - clusterArn: msk.cluster.attrArn, - subnets: vpc.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), - brokerSecurityGroup: securityGroup, - kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, - clusterType: MskClusterType.SERVERLESS, - removalPolicy: RemovalPolicy.DESTROY, - clientAuthentication: ClientAuthentication.sasl({ iam: true }), - serviceToken: msk.serviceToken, + vpc: vpc.vpc, + clusterArn: msk.cluster.attrArn, + subnets: vpc.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), + brokerSecurityGroup: securityGroup, + kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, + clusterType: MskClusterType.SERVERLESS, + removalPolicy: RemovalPolicy.DESTROY, + clientAuthentication: ClientAuthentication.sasl({ iam: true }), + serviceToken: msk.serviceToken, }); kafkaApi.setTopic('dummyTopic', - Authentication.IAM, - { - topic: 'dummy', - numPartitions: 3, - }, - RemovalPolicy.DESTROY, - true, 1000 + Authentication.IAM, + { + topic: 'dummy', + numPartitions: 3, + }, + RemovalPolicy.DESTROY, + true, 1000, ); new CfnOutput(stack, 'clusterArn', { - value: msk.cluster.attrArn, + value: msk.cluster.attrArn, }); let deployResult: Record; beforeAll(async () => { - // WHEN - deployResult = await testStack.deploy(); + // WHEN + deployResult = await testStack.deploy(); }, 10000000); test('MSK cluster created successfully', async () => { - // THEN - expect(deployResult.clusterArn).toContain('arn'); + // THEN + expect(deployResult.clusterArn).toContain('arn'); }); /* @@ -87,5 +87,5 @@ test('Kafka API outputs service token successfully', async () => { */ afterAll(async () => { - await testStack.destroy(); + await testStack.destroy(); }, 10000000); \ No newline at end of file From 5c4ac02b72b99f56b164549fbd5700eabde50920 Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Wed, 11 Sep 2024 17:31:51 +0200 Subject: [PATCH 08/14] Added check on output to validate servicetoken in kafka api --- framework/src/streaming/lib/msk/kafka-api.ts | 8 +- .../src/streaming/lib/msk/msk-provisioned.ts | 4 +- .../src/streaming/lib/msk/msk-serverless.ts | 5 +- .../kafka-api-byo-service-token.e2e.test.ts | 79 +++++++++++++++---- 4 files changed, 72 insertions(+), 24 deletions(-) diff --git a/framework/src/streaming/lib/msk/kafka-api.ts b/framework/src/streaming/lib/msk/kafka-api.ts index b641a7101..16faf78d4 100644 --- a/framework/src/streaming/lib/msk/kafka-api.ts +++ b/framework/src/streaming/lib/msk/kafka-api.ts @@ -99,14 +99,11 @@ export class KafkaApi extends TrackedConstruct { throw Error ('VPC requires the following attributes: "vpcId", "availabilityZones", "privateSubnets" '); } - if (props.clientAuthentication.saslProps?.iam == true && props.serviceToken) { - this.mskIamServiceToken = props.serviceToken; - } - if (props.clientAuthentication.tlsProps?.certificateAuthorities) { if (props.serviceToken) { this.mskAclServiceToken = props.serviceToken; + this.serviceToken = props.serviceToken; } else { const mskAclProvider = mskAclAdminProviderSetup( this, @@ -131,7 +128,8 @@ export class KafkaApi extends TrackedConstruct { if ( props.clientAuthentication.saslProps?.iam) { if (props.serviceToken) { - this.mskAclServiceToken = props.serviceToken; + this.mskIamServiceToken = props.serviceToken; + this.serviceToken = props.serviceToken; } else { const mskIamProvider = mskIamCrudProviderSetup( this, diff --git a/framework/src/streaming/lib/msk/msk-provisioned.ts b/framework/src/streaming/lib/msk/msk-provisioned.ts index 91ab9e417..d6a3de429 100644 --- a/framework/src/streaming/lib/msk/msk-provisioned.ts +++ b/framework/src/streaming/lib/msk/msk-provisioned.ts @@ -5,7 +5,7 @@ import { readFileSync } from 'fs'; import { join } from 'path'; -import { CfnOutput, CustomResource, Duration, RemovalPolicy, Stack } from 'aws-cdk-lib'; +import { CfnOutput, CustomResource, Duration, RemovalPolicy, Stack, Aws } from 'aws-cdk-lib'; import { Connections, ISecurityGroup, IVpc, SecurityGroup, SubnetType } from 'aws-cdk-lib/aws-ec2'; import { IPrincipal, IRole, ManagedPolicy, PolicyDocument, PolicyStatement, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'; import { IKey, Key } from 'aws-cdk-lib/aws-kms'; @@ -644,7 +644,7 @@ export class MskProvisioned extends TrackedConstruct { new CfnOutput(this, 'ServiceToken', { value: this.serviceToken!, - exportName: 'ServiceToken', + exportName: `${Aws.STACK_NAME}-ServiceToken`, }); } diff --git a/framework/src/streaming/lib/msk/msk-serverless.ts b/framework/src/streaming/lib/msk/msk-serverless.ts index ad58633fb..3d762bccb 100644 --- a/framework/src/streaming/lib/msk/msk-serverless.ts +++ b/framework/src/streaming/lib/msk/msk-serverless.ts @@ -1,7 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -import { CfnOutput, CustomResource, RemovalPolicy } from 'aws-cdk-lib'; +import { CfnOutput, CustomResource, RemovalPolicy, Aws } from 'aws-cdk-lib'; import { ISecurityGroup, IVpc, SecurityGroup, SubnetSelection } from 'aws-cdk-lib/aws-ec2'; import { IPrincipal, PolicyDocument } from 'aws-cdk-lib/aws-iam'; import { CfnClusterPolicy, CfnServerlessCluster } from 'aws-cdk-lib/aws-msk'; @@ -118,10 +118,9 @@ export class MskServerless extends TrackedConstruct { }); this.serviceToken = this.kafkaApi.serviceToken; - new CfnOutput(this, 'ServiceToken', { value: this.serviceToken!, - exportName: 'ServiceToken', + exportName: `${Aws.STACK_NAME}-ServiceToken`, }); } diff --git a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts index d007d4a95..929ef2964 100644 --- a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts +++ b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts @@ -8,7 +8,7 @@ */ -import { App, RemovalPolicy, CfnOutput } from 'aws-cdk-lib'; +import { App, RemovalPolicy, CfnOutput, Fn } from 'aws-cdk-lib'; import { SecurityGroup, SubnetType } from 'aws-cdk-lib/aws-ec2'; import { TestStack } from './test-stack'; @@ -21,18 +21,18 @@ jest.setTimeout(10000000); // GIVEN const app = new App(); const testStack = new TestStack('KafkaAPiTestStack', app); -const { stack } = testStack; -stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true); +const { stack: stackNewToken } = testStack; +stackNewToken.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true); -let vpc = new DataVpc(stack, 'vpc', { +let vpc = new DataVpc(stackNewToken, 'vpc', { vpcCidr: '10.0.0.0/16', removalPolicy: RemovalPolicy.DESTROY, }); -let securityGroup = SecurityGroup.fromSecurityGroupId(stack, 'securityGroup', vpc.vpc.vpcDefaultSecurityGroup); +let securityGroup = SecurityGroup.fromSecurityGroupId(stackNewToken, 'securityGroup', vpc.vpc.vpcDefaultSecurityGroup); -const msk = new MskServerless(stack, 'cluster', { - clusterName: `cluster-serverless${Utils.generateHash(stack.stackName).slice(0, 3)}`, +const msk = new MskServerless(stackNewToken, 'cluster', { + clusterName: `cluster-serverless${Utils.generateHash(stackNewToken.stackName).slice(0, 3)}`, vpc: vpc.vpc, subnets: vpc.vpc.selectSubnets(), securityGroups: [securityGroup], @@ -40,7 +40,7 @@ const msk = new MskServerless(stack, 'cluster', { kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, }); -const kafkaApi = new KafkaApi(stack, 'kafkaApi', { +const kafkaApi = new KafkaApi(stackNewToken, 'kafkaApi', { vpc: vpc.vpc, clusterArn: msk.cluster.attrArn, subnets: vpc.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), @@ -52,7 +52,6 @@ const kafkaApi = new KafkaApi(stack, 'kafkaApi', { serviceToken: msk.serviceToken, }); - kafkaApi.setTopic('dummyTopic', Authentication.IAM, { @@ -63,29 +62,81 @@ kafkaApi.setTopic('dummyTopic', true, 1000, ); -new CfnOutput(stack, 'clusterArn', { +new CfnOutput(stackNewToken, 'clusterArn', { value: msk.cluster.attrArn, }); let deployResult: Record; +// GIVEN +const testStackReuseServiceToken = new TestStack('KafkaAPiTestStackReuseServiceToken', app); +const { stack: stackReuseServiceToken} = testStackReuseServiceToken; +stackReuseServiceToken.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true); + +const importedServiceToken = Fn.importValue(stackNewToken.stackName+'-ServiceToken'); + +const kafkaApiReuseServiceToken = new KafkaApi(stackReuseServiceToken, 'kafkaApiReuseServiceToken', { + vpc: vpc.vpc, + clusterArn: msk.cluster.attrArn, + subnets: vpc.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), + brokerSecurityGroup: securityGroup, + kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, + clusterType: MskClusterType.SERVERLESS, + removalPolicy: RemovalPolicy.DESTROY, + clientAuthentication: ClientAuthentication.sasl({ iam: true }), + serviceToken: importedServiceToken, +}); + + +kafkaApiReuseServiceToken.setTopic('dummyTopicServiceToken', + Authentication.IAM, + { + topic: 'dummyServiceToken', + numPartitions: 3, + }, + RemovalPolicy.DESTROY, + true, 1000, +); + +new CfnOutput(stackReuseServiceToken, 'reusedServiceToken', { + value: kafkaApiReuseServiceToken.serviceToken!, + }); + +let deployResultServiceToken: Record; + beforeAll(async () => { // WHEN deployResult = await testStack.deploy(); + deployResultServiceToken = await testStackReuseServiceToken.deploy(); }, 10000000); test('MSK cluster created successfully', async () => { // THEN - expect(deployResult.clusterArn).toContain('arn'); + expect(deployResult['clusterArn']).toContain('arn'); }); -/* test('Kafka API outputs service token successfully', async () => { // THEN - expect(deployResult.ServiceToken).toContain('arn'); + const keyWithServiceToken = Object.keys(deployResult).find(key => key.includes('ServiceToken')); + if (keyWithServiceToken) { + expect(deployResult[keyWithServiceToken]).toContain('arn'); + } else { + throw new Error('ServiceToken not found in deploy result'); + } +}); + +test('Kafka API reuses service token successfully', async () => { + // THEN + expect(deployResultServiceToken.reusedServiceToken).toContain('arn'); + const keyWithServiceToken = Object.keys(deployResult).find(key => key.includes('ServiceToken')); + if (keyWithServiceToken) { + expect(deployResult[keyWithServiceToken]).toEqual(deployResultServiceToken.reusedServiceToken); + } else { + throw new Error('ServiceToken not found in deploy result'); + } }); -*/ afterAll(async () => { await testStack.destroy(); + await testStackReuseServiceToken.destroy(); }, 10000000); \ No newline at end of file From 253efde9f1ab3c46e32ba46b1f2724297d5d1cb4 Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Wed, 11 Sep 2024 17:37:23 +0200 Subject: [PATCH 09/14] fix typo in e2e test description --- framework/test/e2e/kafka-api.e2e.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/test/e2e/kafka-api.e2e.test.ts b/framework/test/e2e/kafka-api.e2e.test.ts index 7085b716c..a9ae15fe1 100644 --- a/framework/test/e2e/kafka-api.e2e.test.ts +++ b/framework/test/e2e/kafka-api.e2e.test.ts @@ -119,7 +119,7 @@ beforeAll(async() => { deployResult = await testStack.deploy(); }, 10000000); -test('CMSK cluster created successfully', async () => { +test('MSK cluster created successfully', async () => { // THEN expect(deployResult.clusterArn).toContain('arn'); }); From 9169f538567552622e06fad076a18c490fbe7f9a Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Wed, 11 Sep 2024 18:57:05 +0200 Subject: [PATCH 10/14] Reducing number of parameters to kafka api when reusing service token in e2e --- framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts index 929ef2964..d5022e15d 100644 --- a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts +++ b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts @@ -78,11 +78,8 @@ const importedServiceToken = Fn.importValue(stackNewToken.stackName+'-ServiceTok const kafkaApiReuseServiceToken = new KafkaApi(stackReuseServiceToken, 'kafkaApiReuseServiceToken', { vpc: vpc.vpc, clusterArn: msk.cluster.attrArn, - subnets: vpc.vpc.selectSubnets({ subnetType: SubnetType.PRIVATE_WITH_EGRESS }), brokerSecurityGroup: securityGroup, - kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, clusterType: MskClusterType.SERVERLESS, - removalPolicy: RemovalPolicy.DESTROY, clientAuthentication: ClientAuthentication.sasl({ iam: true }), serviceToken: importedServiceToken, }); From eb3faf23c5b46bf42a70b0aefdf3d01ba98c2f93 Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Wed, 11 Sep 2024 23:53:38 +0200 Subject: [PATCH 11/14] simplify assignation and removed redundant test --- framework/src/streaming/lib/msk/kafka-api.ts | 6 +- .../test/unit/streaming/kafka-api.test.ts | 60 ------------------- 2 files changed, 2 insertions(+), 64 deletions(-) diff --git a/framework/src/streaming/lib/msk/kafka-api.ts b/framework/src/streaming/lib/msk/kafka-api.ts index 16faf78d4..0f5f81f72 100644 --- a/framework/src/streaming/lib/msk/kafka-api.ts +++ b/framework/src/streaming/lib/msk/kafka-api.ts @@ -102,8 +102,7 @@ export class KafkaApi extends TrackedConstruct { if (props.clientAuthentication.tlsProps?.certificateAuthorities) { if (props.serviceToken) { - this.mskAclServiceToken = props.serviceToken; - this.serviceToken = props.serviceToken; + this.mskAclServiceToken = this.serviceToken = props.serviceToken; } else { const mskAclProvider = mskAclAdminProviderSetup( this, @@ -128,8 +127,7 @@ export class KafkaApi extends TrackedConstruct { if ( props.clientAuthentication.saslProps?.iam) { if (props.serviceToken) { - this.mskIamServiceToken = props.serviceToken; - this.serviceToken = props.serviceToken; + this.mskIamServiceToken = this.serviceToken = props.serviceToken; } else { const mskIamProvider = mskIamCrudProviderSetup( this, diff --git a/framework/test/unit/streaming/kafka-api.test.ts b/framework/test/unit/streaming/kafka-api.test.ts index aa17016d6..171827fe7 100644 --- a/framework/test/unit/streaming/kafka-api.test.ts +++ b/framework/test/unit/streaming/kafka-api.test.ts @@ -1250,64 +1250,4 @@ describe('Using global removal policy and DELETE construct removal policy ', () DeletionPolicy: 'Delete', }); }); -}); - -describe('Using default KafkaApi configuration with MSK provisioned and IAM and mTLS authentication should ', () => { - - const app = new App(); - const stack = new Stack(app, 'Stack'); - - const brokerSecurityGroup = SecurityGroup.fromSecurityGroupId(stack, 'sg', 'sg-1234'); - const vpc = Vpc.fromVpcAttributes(stack, 'vpc', { - vpcId: 'XXXXXXXX', - availabilityZones: ['us-east-1a'], - vpcCidrBlock: '10.0.0.0/16', - privateSubnetIds: ['XXXXXXXX'], - }); - - const cluster = new CfnCluster(stack, 'MyCluster', { - clientAuthentication: { - sasl: { - iam: { - enabled: true, - }, - }, - }, - brokerNodeGroupInfo: { - clientSubnets: vpc.privateSubnets.map(s => s.subnetId), - instanceType: 'kafka.m5large', - securityGroups: [brokerSecurityGroup.securityGroupId], - }, - clusterName: 'XXXXXX', - kafkaVersion: '3.5.1', - numberOfBrokerNodes: 3, - }); - - const kafkaApi = new KafkaApi(stack, 'KafkaApi', { - clusterArn: cluster.attrArn, - clusterType: MskClusterType.PROVISIONED, - brokerSecurityGroup, - vpc, - clientAuthentication: ClientAuthentication.sasl({ - iam: true, - }), - kafkaClientLogLevel: KafkaClientLogLevel.DEBUG, - serviceToken: 'arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX', - }); - - kafkaApi.setTopic('topic1', - Authentication.IAM, - { - topic: 'topic1', - numPartitions: 1, - }, - ); - - const template = Template.fromStack(stack, {}); - - test('should have a service token to reuse', () => { - template.hasResourceProperties('Custom::MskTopic', { - ServiceToken: 'arn:aws:lambda::XXXXXX:function:XXXXXX-kafkaApiMskIamProviderCustomResour-XXXXXX', - }); - }); }); \ No newline at end of file From 083a26c36c4dad40d2b90d457bba3f7675c14f27 Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Wed, 11 Sep 2024 23:55:30 +0200 Subject: [PATCH 12/14] removing extra lib --- framework/test/unit/streaming/kafka-api.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/test/unit/streaming/kafka-api.test.ts b/framework/test/unit/streaming/kafka-api.test.ts index 171827fe7..912a9ebd0 100644 --- a/framework/test/unit/streaming/kafka-api.test.ts +++ b/framework/test/unit/streaming/kafka-api.test.ts @@ -15,7 +15,7 @@ import { SecurityGroup, Vpc, Subnet } from 'aws-cdk-lib/aws-ec2'; import { Role } from 'aws-cdk-lib/aws-iam'; import { CfnCluster } from 'aws-cdk-lib/aws-msk'; import { Secret } from 'aws-cdk-lib/aws-secretsmanager'; -import { Authentication, ClientAuthentication, KafkaApi, MskClusterType, KafkaClientLogLevel } from '../../../src/streaming'; +import { Authentication, ClientAuthentication, KafkaApi, MskClusterType } from '../../../src/streaming'; describe('Using default KafkaApi configuration with MSK provisioned and IAM and mTLS authentication should ', () => { From 2a4ea7ad1b5a55d7d29062f6b50017907e107cdf Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Thu, 12 Sep 2024 22:40:00 +0200 Subject: [PATCH 13/14] updated doc and removing servicetoken assign in case we receive the service token by parameters --- framework/src/streaming/README.md | 2 +- framework/src/streaming/lib/msk/kafka-api.ts | 4 +-- .../kafka-api-byo-service-token.e2e.test.ts | 32 ++++++++----------- 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/framework/src/streaming/README.md b/framework/src/streaming/README.md index aa55d2f89..0a64b0a8f 100644 --- a/framework/src/streaming/README.md +++ b/framework/src/streaming/README.md @@ -202,7 +202,7 @@ The construct leverages the [CDK Provider Framework](https://docs.aws.amazon.com [example kafka api](./examples/kafka-api-default.lit.ts) -When deploying multiple stacks with the Kafka Api, if there is an already existing service token deployed for the custom resource, you can reuse it to reduce the number of resource created, therefore the number of IP assigned to the custom resources. +When deploying multiple stacks with the Kafka Api, if there is an already existing service token deployed for the custom resource, you can reuse it to reduce the number of resources created like lambdas and ENI that are used to create and manage the lifecycle the custom resources, like ACLs and Topics. :::warning diff --git a/framework/src/streaming/lib/msk/kafka-api.ts b/framework/src/streaming/lib/msk/kafka-api.ts index 0f5f81f72..c67b2e148 100644 --- a/framework/src/streaming/lib/msk/kafka-api.ts +++ b/framework/src/streaming/lib/msk/kafka-api.ts @@ -102,7 +102,7 @@ export class KafkaApi extends TrackedConstruct { if (props.clientAuthentication.tlsProps?.certificateAuthorities) { if (props.serviceToken) { - this.mskAclServiceToken = this.serviceToken = props.serviceToken; + this.mskAclServiceToken = props.serviceToken; } else { const mskAclProvider = mskAclAdminProviderSetup( this, @@ -127,7 +127,7 @@ export class KafkaApi extends TrackedConstruct { if ( props.clientAuthentication.saslProps?.iam) { if (props.serviceToken) { - this.mskIamServiceToken = this.serviceToken = props.serviceToken; + this.mskIamServiceToken = props.serviceToken; } else { const mskIamProvider = mskIamCrudProviderSetup( this, diff --git a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts index d5022e15d..acbe2adae 100644 --- a/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts +++ b/framework/test/e2e/kafka-api-byo-service-token.e2e.test.ts @@ -70,7 +70,7 @@ let deployResult: Record; // GIVEN const testStackReuseServiceToken = new TestStack('KafkaAPiTestStackReuseServiceToken', app); -const { stack: stackReuseServiceToken} = testStackReuseServiceToken; +const { stack: stackReuseServiceToken } = testStackReuseServiceToken; stackReuseServiceToken.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true); const importedServiceToken = Fn.importValue(stackNewToken.stackName+'-ServiceToken'); @@ -96,8 +96,8 @@ kafkaApiReuseServiceToken.setTopic('dummyTopicServiceToken', ); new CfnOutput(stackReuseServiceToken, 'reusedServiceToken', { - value: kafkaApiReuseServiceToken.serviceToken!, - }); + value: 'success', +}); let deployResultServiceToken: Record; @@ -109,28 +109,22 @@ beforeAll(async () => { test('MSK cluster created successfully', async () => { // THEN - expect(deployResult['clusterArn']).toContain('arn'); + expect(deployResult.clusterArn).toContain('arn'); }); test('Kafka API outputs service token successfully', async () => { - // THEN - const keyWithServiceToken = Object.keys(deployResult).find(key => key.includes('ServiceToken')); - if (keyWithServiceToken) { - expect(deployResult[keyWithServiceToken]).toContain('arn'); - } else { - throw new Error('ServiceToken not found in deploy result'); - } + // THEN + const keyWithServiceToken = Object.keys(deployResult).find(key => key.includes('ServiceToken')); + if (keyWithServiceToken) { + expect(deployResult[keyWithServiceToken]).toContain('arn'); + } else { + throw new Error('ServiceToken not found in deploy result'); + } }); test('Kafka API reuses service token successfully', async () => { - // THEN - expect(deployResultServiceToken.reusedServiceToken).toContain('arn'); - const keyWithServiceToken = Object.keys(deployResult).find(key => key.includes('ServiceToken')); - if (keyWithServiceToken) { - expect(deployResult[keyWithServiceToken]).toEqual(deployResultServiceToken.reusedServiceToken); - } else { - throw new Error('ServiceToken not found in deploy result'); - } + // THEN + expect(deployResultServiceToken.reusedServiceToken).toEqual('success'); }); afterAll(async () => { From 8b34ca29992874c2e3d5115f084e3f28690991d5 Mon Sep 17 00:00:00 2001 From: lmouhib Date: Fri, 13 Sep 2024 08:56:51 +0100 Subject: [PATCH 14/14] fix website generated content --- .../docs/constructs/library/generated/_streaming-kafka-api.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/docs/constructs/library/generated/_streaming-kafka-api.mdx b/website/docs/constructs/library/generated/_streaming-kafka-api.mdx index 5e0fd5856..11cfce562 100644 --- a/website/docs/constructs/library/generated/_streaming-kafka-api.mdx +++ b/website/docs/constructs/library/generated/_streaming-kafka-api.mdx @@ -81,7 +81,7 @@ kafka_api = KafkaApi(stack, "kafkaApi", -When deploying multiple stacks with the Kafka Api, if there is an already existing service token deployed for the custom resource, you can reuse it to reduce the number of resource created, therefore the number of IP assigned to the custom resources. +When deploying multiple stacks with the Kafka Api, if there is an already existing service token deployed for the custom resource, you can reuse it to reduce the number of resources created like lambdas and ENI that are used to create and manage the lifecycle the custom resources, like ACLs and Topics. :::warning