From 9735429d1a9079bb50cd195edaae89278693b33d Mon Sep 17 00:00:00 2001 From: Armando Segnini Date: Wed, 12 Jun 2024 10:13:39 +0200 Subject: [PATCH] feat(kafka-api): Add IAM as Authentication method when creating ACL with kafka api (#655) * Support setting ACLs in cluster with heterogeneous authentication, use IAM as Authentication method when creating ACL with kafka api --- framework/API.md | 169 +++++++++++----- framework/src/streaming/README.md | 7 +- .../examples/kafka-api-set-acl.lit.ts | 5 +- .../examples/msk-provisioned-set-acl.lit.ts | 6 +- .../examples/msk-provisioned-set-topic.lit.ts | 10 + .../examples/msk-serverless-set-topic.lit.ts | 10 + framework/src/streaming/lib/msk/kafka-api.ts | 100 ++++++---- .../src/streaming/lib/msk/msk-helpers.ts | 11 ++ .../src/streaming/lib/msk/msk-provisioned.ts | 89 +++++---- framework/src/streaming/lib/msk/msk-utils.ts | 7 +- .../resources/lambdas/crudIam/acl-crud.mjs | 103 ++++++++++ .../msk/resources/lambdas/crudIam/index.mjs | 137 ++----------- .../resources/lambdas/crudIam/topic-crud.mjs | 183 ++++++++++++++++++ .../lambdas/tlsCrudAdminClient/index.mjs | 1 + .../test/e2e/msk-provisioned-tls.e2e.test.ts | 123 +++++++++++- .../test/unit/streaming/kafka-api.test.ts | 122 ++++++++++++ .../unit/streaming/msk-provisioned.test.ts | 6 +- .../docs/constructs/cookbook/contributing.md | 12 +- .../generated/_streaming-kafka-api.mdx | 10 +- .../generated/_streaming-msk-provisioned.mdx | 28 ++- .../generated/_streaming-msk-serverless.mdx | 20 +- 21 files changed, 892 insertions(+), 267 deletions(-) create mode 100644 framework/src/streaming/lib/msk/resources/lambdas/crudIam/acl-crud.mjs create mode 100644 framework/src/streaming/lib/msk/resources/lambdas/crudIam/topic-crud.mjs diff --git a/framework/API.md b/framework/API.md index fbf6ce968..023efe03e 100644 --- a/framework/API.md +++ b/framework/API.md @@ -4059,7 +4059,7 @@ The security group for Client VPN Endpoint. ### KafkaApi -A construct to create an MSK Serverless cluster. +A construct to create a Kafka API admin client. > [https://awslabs.github.io/data-solutions-framework-on-aws/](https://awslabs.github.io/data-solutions-framework-on-aws/) @@ -4106,11 +4106,11 @@ the ID of the CDK Construct. | **Name** | **Description** | | --- | --- | | toString | Returns a string representation of this construct. | -| grantConsume | Grant a principal the right to consume data from a topic. | -| grantProduce | Grant a principal to produce data to a topic. | +| grantConsume | Grant a principal permissions to consume from a topic. | +| grantProduce | Grant a principal permissions to produce to a topic. | | retrieveVersion | Retrieve DSF package.json version. | -| setAcl | Creates a topic in the Msk Cluster. | -| setTopic | Creates a topic in the Msk Cluster. | +| setAcl | Creates a ACL in the MSK Cluster. | +| setTopic | Creates a topic in the MSK Cluster. | --- @@ -4125,16 +4125,16 @@ Returns a string representation of this construct. ##### `grantConsume` ```typescript -public grantConsume(id: string, topicName: string, clientAuthentication: Authentication, principal: string | IPrincipal, host?: string, removalPolicy?: RemovalPolicy): CustomResource +public grantConsume(id: string, topicName: string, clientAuthentication: Authentication, principal: string | IPrincipal, host?: string, removalPolicy?: RemovalPolicy, customResourceAuthentication?: Authentication): CustomResource ``` -Grant a principal the right to consume data from a topic. +Grant a principal permissions to consume from a topic. ###### `id`Required - *Type:* string -the CDK resource id. +the CDK resource ID. --- @@ -4142,7 +4142,7 @@ the CDK resource id. - *Type:* string -the topic to which the principal can produce data. +the target topic to grant consume permissions on. --- @@ -4150,7 +4150,7 @@ the topic to which the principal can produce data. - *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication -The client authentication to use when grant on resource. +The authentication mode of the consumer. --- @@ -4158,7 +4158,7 @@ The client authentication to use when grant on resource. - *Type:* string | aws-cdk-lib.aws_iam.IPrincipal -the IAM principal to grand the produce to. +the principal receiveing grant consume permissions. --- @@ -4166,7 +4166,7 @@ the IAM principal to grand the produce to. - *Type:* string -the host to which the principal can produce data. +the host of the consumer. --- @@ -4178,19 +4178,27 @@ the removal policy to apply to the grant. --- +###### `customResourceAuthentication`Optional + +- *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication + +The authentication used by the Kafka API admin client to create the ACL. + +--- + ##### `grantProduce` ```typescript -public grantProduce(id: string, topicName: string, clientAuthentication: Authentication, principal: string | IPrincipal, host?: string, removalPolicy?: RemovalPolicy): CustomResource +public grantProduce(id: string, topicName: string, clientAuthentication: Authentication, principal: string | IPrincipal, host?: string, removalPolicy?: RemovalPolicy, customResourceAuthentication?: Authentication): CustomResource ``` -Grant a principal to produce data to a topic. +Grant a principal permissions to produce to a topic. ###### `id`Required - *Type:* string -the CDK resource id. +the CDK resource ID. --- @@ -4198,7 +4206,7 @@ the CDK resource id. - *Type:* string -the topic to which the principal can produce data. +the target topic to grant produce permissions on. --- @@ -4206,7 +4214,7 @@ the topic to which the principal can produce data. - *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication -The client authentication to use when grant on resource. +The authentication mode of the producer. --- @@ -4214,7 +4222,7 @@ The client authentication to use when grant on resource. - *Type:* string | aws-cdk-lib.aws_iam.IPrincipal -the IAM principal to grand the produce to. +the principal receiving grant produce permissions. --- @@ -4222,7 +4230,7 @@ the IAM principal to grand the produce to. - *Type:* string -the host to which the principal can produce data. +the host of the producer. --- @@ -4234,6 +4242,14 @@ the removal policy to apply to the grant. --- +###### `customResourceAuthentication`Optional + +- *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication + +The authentication used by the Kafka API admin client to create the ACL. + +--- + ##### `retrieveVersion` ```typescript @@ -4245,16 +4261,16 @@ Retrieve DSF package.json version. ##### `setAcl` ```typescript -public setAcl(id: string, aclDefinition: Acl, removalPolicy?: RemovalPolicy): CustomResource +public setAcl(id: string, aclDefinition: Acl, removalPolicy?: RemovalPolicy, clientAuthentication?: Authentication): CustomResource ``` -Creates a topic in the Msk Cluster. +Creates a ACL in the MSK Cluster. ###### `id`Required - *Type:* string -the CDK id for Topic. +the CDK ID of the ACL. --- @@ -4262,7 +4278,7 @@ the CDK id for Topic. - *Type:* @cdklabs/aws-data-solutions-framework.streaming.Acl -the Kafka Acl definition. +the Kafka ACL definition. --- @@ -4274,19 +4290,27 @@ Wether to keep the ACL or delete it when removing the resource from the Stack. --- +###### `clientAuthentication`Optional + +- *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication + +The authentication used by the Kafka API admin client to create the ACL. + +--- + ##### `setTopic` ```typescript public setTopic(id: string, clientAuthentication: Authentication, topicDefinition: MskTopic, removalPolicy?: RemovalPolicy, waitForLeaders?: boolean, timeout?: number): CustomResource ``` -Creates a topic in the Msk Cluster. +Creates a topic in the MSK Cluster. ###### `id`Required - *Type:* string -the CDK id for Topic. +the CDK ID for Topic. --- @@ -4294,7 +4318,7 @@ the CDK id for Topic. - *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication -The client authentication to use when creating the Topic. +The authentication used by the Kafka API admin client to create the topic. --- @@ -4318,7 +4342,7 @@ Wether to keep the topic or delete it when removing the resource from the Stack. - *Type:* boolean -If this is true it will wait until metadata for the new topics doesn't throw LEADER_NOT_AVAILABLE. +If set to true, waits until metadata for the new topics doesn't throw LEADER_NOT_AVAILABLE. --- @@ -4581,11 +4605,11 @@ the ID of the CDK Construct. | toString | Returns a string representation of this construct. | | deleteClusterPolicy | *No description.* | | getBootstrapBrokers | Method to get bootstrap broker connection string based on the authentication mode. | -| grantConsume | Grant a principal the right to consume data from a topic. | -| grantProduce | Grant a principal to produce data to a topic. | +| grantConsume | Grant a principal permissions to consume from a topic. | +| grantProduce | Grant a principal permissions to produce to a topic. | | putClusterPolicy | *No description.* | | retrieveVersion | Retrieve DSF package.json version. | -| setAcl | Creates a topic in the Msk Cluster. | +| setAcl | Creates ACL in the Msk Cluster. | | setTopic | Creates a topic in the Msk Cluster. | --- @@ -4623,16 +4647,16 @@ the authentication mode. ##### `grantConsume` ```typescript -public grantConsume(id: string, topicName: string, clientAuthentication: Authentication, principal: string | IPrincipal, host?: string, removalPolicy?: RemovalPolicy): CustomResource +public grantConsume(id: string, topicName: string, clientAuthentication: Authentication, principal: string | IPrincipal, host?: string, removalPolicy?: RemovalPolicy, customResourceAuthentication?: Authentication): CustomResource ``` -Grant a principal the right to consume data from a topic. +Grant a principal permissions to consume from a topic. ###### `id`Required - *Type:* string -the CDK resource id. +the CDK resource ID. --- @@ -4640,7 +4664,7 @@ the CDK resource id. - *Type:* string -the topic to which the principal can produce data. +the target topic to grant consume permissions on. --- @@ -4648,7 +4672,7 @@ the topic to which the principal can produce data. - *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication -The client authentication to use when grant on resource. +The authentication mode of the consumer. --- @@ -4656,7 +4680,7 @@ The client authentication to use when grant on resource. - *Type:* string | aws-cdk-lib.aws_iam.IPrincipal -the IAM principal to grand the produce to. +the principal receiveing grant consume permissions. --- @@ -4664,7 +4688,7 @@ the IAM principal to grand the produce to. - *Type:* string -the host to which the principal can produce data. +the host of the consumer. --- @@ -4672,21 +4696,31 @@ the host to which the principal can produce data. - *Type:* aws-cdk-lib.RemovalPolicy +the removal policy to apply to the grant. + +--- + +###### `customResourceAuthentication`Optional + +- *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication + +The authentication used by the Kafka API admin client to create the ACL. + --- ##### `grantProduce` ```typescript -public grantProduce(id: string, topicName: string, clientAuthentication: Authentication, principal: string | IPrincipal, host?: string, removalPolicy?: RemovalPolicy): CustomResource +public grantProduce(id: string, topicName: string, clientAuthentication: Authentication, principal: string | IPrincipal, host?: string, removalPolicy?: RemovalPolicy, customResourceAuthentication?: Authentication): CustomResource ``` -Grant a principal to produce data to a topic. +Grant a principal permissions to produce to a topic. ###### `id`Required - *Type:* string -the CDK resource id. +the CDK resource ID. --- @@ -4694,7 +4728,7 @@ the CDK resource id. - *Type:* string -the topic to which the principal can produce data. +the target topic to grant produce permissions on. --- @@ -4702,7 +4736,7 @@ the topic to which the principal can produce data. - *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication -The client authentication to use when grant on resource. +The authentication mode of the producer. --- @@ -4710,7 +4744,7 @@ The client authentication to use when grant on resource. - *Type:* string | aws-cdk-lib.aws_iam.IPrincipal -the IAM principal to grand the produce to. +the principal receiving grant produce permissions. --- @@ -4718,7 +4752,7 @@ the IAM principal to grand the produce to. - *Type:* string -the host to which the principal can produce data. +the host of the producer. --- @@ -4726,6 +4760,16 @@ the host to which the principal can produce data. - *Type:* aws-cdk-lib.RemovalPolicy +the removal policy to apply to the grant. + +--- + +###### `customResourceAuthentication`Optional + +- *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication + +The authentication used by the Kafka API admin client to create the ACL. + --- ##### `putClusterPolicy` @@ -4763,16 +4807,16 @@ Retrieve DSF package.json version. ##### `setAcl` ```typescript -public setAcl(id: string, aclDefinition: Acl, removalPolicy?: RemovalPolicy): CustomResource +public setAcl(id: string, aclDefinition: Acl, removalPolicy?: RemovalPolicy, clientAuthentication?: Authentication): CustomResource ``` -Creates a topic in the Msk Cluster. +Creates ACL in the Msk Cluster. ###### `id`Required - *Type:* string -the CDK id for Topic. +the CDK ID of the ACL. --- @@ -4792,6 +4836,14 @@ Wether to keep the ACL or delete it when removing the resource from the Stack {@ --- +###### `clientAuthentication`Optional + +- *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication + +The authentication used by the Kafka API admin client to create the ACL. + +--- + ##### `setTopic` ```typescript @@ -4804,7 +4856,7 @@ Creates a topic in the Msk Cluster. - *Type:* string -the CDK id for Topic. +the CDK ID of the Topic. --- @@ -4812,7 +4864,7 @@ the CDK id for Topic. - *Type:* @cdklabs/aws-data-solutions-framework.streaming.Authentication -The client authentication to use when creating the Topic. +The authentication used by the Kafka API admin client to create the topic. --- @@ -15747,7 +15799,7 @@ public readonly iam: boolean; ``` - *Type:* boolean -- *Default:* true +- *Default:* false Enable IAM access control. @@ -15781,7 +15833,7 @@ public readonly iam: boolean; ``` - *Type:* boolean -- *Default:* true +- *Default:* false Enable IAM access control. @@ -15828,7 +15880,7 @@ public readonly iam: boolean; ``` - *Type:* boolean -- *Default:* true +- *Default:* false Enable IAM access control. @@ -17683,6 +17735,7 @@ cluster version number. | V3_3_2 | @cdklabs/aws-data-solutions-framework.streaming.KafkaVersion | Kafka version 3.3.2. | | V3_4_0 | @cdklabs/aws-data-solutions-framework.streaming.KafkaVersion | Kafka version 3.4.0. | | V3_5_1 | @cdklabs/aws-data-solutions-framework.streaming.KafkaVersion | Kafka version 3.5.1. | +| V3_6_0 | @cdklabs/aws-data-solutions-framework.streaming.KafkaVersion | Kafka version 3.6.0. | --- @@ -17942,6 +17995,18 @@ Kafka version 3.5.1. --- +##### `V3_6_0`Required + +```typescript +public readonly V3_6_0: KafkaVersion; +``` + +- *Type:* @cdklabs/aws-data-solutions-framework.streaming.KafkaVersion + +Kafka version 3.6.0. + +--- + ### MskBrokerInstanceType Kafka cluster version. diff --git a/framework/src/streaming/README.md b/framework/src/streaming/README.md index f3fd4a831..06bbc2426 100644 --- a/framework/src/streaming/README.md +++ b/framework/src/streaming/README.md @@ -91,6 +91,8 @@ The topic is defined by the property type called `MskACL`. This method should be } ``` +You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in `clientAuthentication`: for mTLS use `Authentitcation.MTLS` and for IAM use `Authentitcation.IAM`. Default value is `Authentitcation.MTLS`. The example below uses IAM as authentication. + [example msk provisiond setACL](./examples/msk-provisioned-set-acl.lit.ts) ### grantProduce @@ -231,6 +233,7 @@ The topic is defined by the property type called `MskTopic`. Below you can see t topic: , numPartitions: , // default: -1 (uses broker `num.partitions` configuration) replicationFactor: , // default: -1 (uses broker `default.replication.factor` configuration) + configEntries: // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: [] } ``` @@ -247,7 +250,7 @@ Only the number of partitions can be updated after the creation of the topic. ### setACL This method allows you to create, update or delete an ACL. Its backend uses [kafkajs](https://kafka.js.org/). -The topic is defined by the property type called `MskACL`. This method should be used only when the cluster authentication is set to `mTLS`. Below you can see the definition of the topic as well as an example of use. +The topic is defined by the property type called `MskACL`. This method can be used when the cluster authentication is set to `mTLS` or `IAM`+`mTLS`. Below you can see the definition of the ACL as well as an example of use. ```json { @@ -261,6 +264,8 @@ The topic is defined by the property type called `MskACL`. This method should be } ``` +You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in `clientAuthentication`: for mTLS use `Authentitcation.MTLS` and for IAM use `Authentitcation.IAM`. Default value is `Authentitcation.MTLS`. The example below uses mTLS as authentication. + [example msk provisiond setACL](./examples/kafka-api-set-acl.lit.ts) ### grantProduce diff --git a/framework/src/streaming/examples/kafka-api-set-acl.lit.ts b/framework/src/streaming/examples/kafka-api-set-acl.lit.ts index 60d1b1fda..7e5e12808 100644 --- a/framework/src/streaming/examples/kafka-api-set-acl.lit.ts +++ b/framework/src/streaming/examples/kafka-api-set-acl.lit.ts @@ -1,6 +1,6 @@ import * as cdk from 'aws-cdk-lib'; import { SecurityGroup, Vpc } from 'aws-cdk-lib/aws-ec2'; -import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, ClientAuthentication, KafkaClientLogLevel, MskClusterType, ResourcePatternTypes } from '../lib/msk'; +import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, ClientAuthentication, KafkaClientLogLevel, MskClusterType, ResourcePatternTypes, Authentication } from '../lib/msk'; import { KafkaApi } from '../lib/msk/kafka-api'; import { CertificateAuthority } from 'aws-cdk-lib/aws-acmpca'; import { Secret } from 'aws-cdk-lib/aws-secretsmanager'; @@ -56,6 +56,7 @@ kafkaApi.setAcl('acl', operation: AclOperationTypes.CREATE, permissionType: AclPermissionTypes.ALLOW, }, - cdk.RemovalPolicy.DESTROY + cdk.RemovalPolicy.DESTROY, + Authentication.MTLS ); /// !hide diff --git a/framework/src/streaming/examples/msk-provisioned-set-acl.lit.ts b/framework/src/streaming/examples/msk-provisioned-set-acl.lit.ts index 8c343815f..8f2fd43e8 100644 --- a/framework/src/streaming/examples/msk-provisioned-set-acl.lit.ts +++ b/framework/src/streaming/examples/msk-provisioned-set-acl.lit.ts @@ -1,5 +1,5 @@ import * as cdk from 'aws-cdk-lib'; -import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, MskProvisioned, ResourcePatternTypes } from '../lib/msk'; +import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, MskProvisioned, ResourcePatternTypes, Authentication } from '../lib/msk'; const app = new cdk.App(); @@ -21,5 +21,7 @@ msk.setAcl('acl', { operation: AclOperationTypes.CREATE, permissionType: AclPermissionTypes.ALLOW, }, - cdk.RemovalPolicy.DESTROY); + cdk.RemovalPolicy.DESTROY, + Authentication.IAM, +); /// !hide diff --git a/framework/src/streaming/examples/msk-provisioned-set-topic.lit.ts b/framework/src/streaming/examples/msk-provisioned-set-topic.lit.ts index 5a5cac317..2cde9f662 100644 --- a/framework/src/streaming/examples/msk-provisioned-set-topic.lit.ts +++ b/framework/src/streaming/examples/msk-provisioned-set-topic.lit.ts @@ -17,6 +17,16 @@ msk.setTopic('topic1', topic: 'topic1', numPartitions: 3, replicationFactor: 1, + configEntries: [ + { + name: 'retention.ms', + value: '90000', + }, + { + name: 'retention.bytes', + value: '90000', + }, + ], }, cdk.RemovalPolicy.DESTROY, false, 1500); /// !hide diff --git a/framework/src/streaming/examples/msk-serverless-set-topic.lit.ts b/framework/src/streaming/examples/msk-serverless-set-topic.lit.ts index 6754727e8..db76a262e 100644 --- a/framework/src/streaming/examples/msk-serverless-set-topic.lit.ts +++ b/framework/src/streaming/examples/msk-serverless-set-topic.lit.ts @@ -15,6 +15,16 @@ let topic: MskTopic = { topic: 'topic1', numPartitions: 3, replicationFactor: 1, + configEntries: [ + { + name: 'retention.ms', + value: '90000', + }, + { + name: 'retention.bytes', + value: '90000', + }, + ], } msk.addTopic('topic1', topic, cdk.RemovalPolicy.DESTROY, false, 1500); diff --git a/framework/src/streaming/lib/msk/kafka-api.ts b/framework/src/streaming/lib/msk/kafka-api.ts index 6715932b6..f6c4d61a1 100644 --- a/framework/src/streaming/lib/msk/kafka-api.ts +++ b/framework/src/streaming/lib/msk/kafka-api.ts @@ -19,7 +19,7 @@ import { import { Context, TrackedConstruct, TrackedConstructProps } from '../../../utils'; /** - * A construct to create an MSK Serverless cluster + * A construct to create a Kafka API admin client * @see https://awslabs.github.io/data-solutions-framework-on-aws/ * * @example @@ -69,7 +69,7 @@ export class KafkaApi extends TrackedConstruct { private readonly clusterType: MskClusterType; /** - * Constructs a new instance of the EmrEksCluster construct. + * Constructs a new instance of the Kafka API construct. * @param {Construct} scope the Scope of the CDK Construct * @param {string} id the ID of the CDK Construct * @param {MskServerlessProps} props @@ -136,20 +136,38 @@ export class KafkaApi extends TrackedConstruct { } /** - * Creates a topic in the Msk Cluster - * @param {string} id the CDK id for Topic - * @param {Acl} aclDefinition the Kafka Acl definition + * Creates a ACL in the MSK Cluster + * @param {string} id the CDK ID of the ACL + * @param {Acl} aclDefinition the Kafka ACL definition * @param {RemovalPolicy} removalPolicy Wether to keep the ACL or delete it when removing the resource from the Stack. @default - RemovalPolicy.RETAIN - * @returns {CustomResource} The MskAcl custom resource created + * @param {Authentication} clientAuthentication The authentication used by the Kafka API admin client to create the ACL @default - Authentication.MTLS + * @returns {CustomResource} The MskAcl custom resource created by the Kafka API admin client */ public setAcl( id: string, aclDefinition: Acl, removalPolicy?: RemovalPolicy, + clientAuthentication?: Authentication, ): CustomResource { + let serviceToken: string; + + let customResourceAuthentication = clientAuthentication ?? Authentication.MTLS; + + if (customResourceAuthentication === Authentication.IAM) { + if (!this.mskIamServiceToken) { + throw Error('IAM Authentication is not supported for this cluster'); + } + serviceToken = this.mskIamServiceToken!; + } else { + if (!this.mskAclServiceToken) { + throw Error('MTLS Authentication is not supported for this cluster'); + } + serviceToken = this.mskAclServiceToken!; + } + const cr = new CustomResource(this, id, { - serviceToken: this.mskAclServiceToken!, + serviceToken: serviceToken, properties: { logLevel: this.kafkaClientLogLevel, secretArn: this.tlsCertifacateSecret?.secretArn, @@ -171,14 +189,14 @@ export class KafkaApi extends TrackedConstruct { } /** - * Creates a topic in the Msk Cluster - * @param {string} id the CDK id for Topic - * @param {Authentication} clientAuthentication The client authentication to use when creating the Topic + * Creates a topic in the MSK Cluster + * @param {string} id the CDK ID for Topic + * @param {Authentication} clientAuthentication The authentication used by the Kafka API admin client to create the topic * @param {MskTopic} topicDefinition the Kafka topic definition * @param {RemovalPolicy} removalPolicy Wether to keep the topic or delete it when removing the resource from the Stack. @default - RemovalPolicy.RETAIN - * @param {boolean} waitForLeaders If this is true it will wait until metadata for the new topics doesn't throw LEADER_NOT_AVAILABLE + * @param {boolean} waitForLeaders If set to true, waits until metadata for the new topics doesn't throw LEADER_NOT_AVAILABLE * @param {number} timeout The time in ms to wait for a topic to be completely created on the controller node @default - 5000 - * @returns {CustomResource} The MskTopic custom resource created + * @returns {CustomResource} The MskTopic custom resource created by the Kafka API admin client */ public setTopic( id: string, @@ -192,17 +210,22 @@ export class KafkaApi extends TrackedConstruct { let region = Stack.of(this).region; if (this.clusterType === MskClusterType.SERVERLESS && topicDefinition.replicationFactor !== undefined) { - // (topicDefinition.replicaAssignment !== undefined || topicDefinition.replicationFactor !== undefined)) { throw new Error("The topic definition is incorrect: MSK Serverless doesn't support replication factor and replication assignments"); } if (clientAuthentication === Authentication.IAM) { + if (!this.mskIamServiceToken) { + throw Error('IAM Authentication is not supported for this cluster'); + } serviceToken = this.mskIamServiceToken!; } else { + if (!this.mskAclServiceToken) { + throw Error('MTLS Authentication is not supported for this cluster'); + } serviceToken = this.mskAclServiceToken!; } - // Create custom resource with async waiter until the Amazon EMR Managed Endpoint is created + // Create custom resource with async waiter until the MSK topic is created const cr = new CustomResource(this, id, { serviceToken: serviceToken, properties: { @@ -223,15 +246,15 @@ export class KafkaApi extends TrackedConstruct { } /** - * Grant a principal to produce data to a topic - * @param {string} id the CDK resource id - * @param {string} topicName the topic to which the principal can produce data - * @param {Authentitcation} clientAuthentication The client authentication to use when grant on resource - * @param {IPrincipal | string } principal the IAM principal to grand the produce to - * @param {string} host the host to which the principal can produce data. @default - * is used + * Grant a principal permissions to produce to a topic + * @param {string} id the CDK resource ID + * @param {string} topicName the target topic to grant produce permissions on + * @param {Authentication} clientAuthentication The authentication mode of the producer + * @param {IPrincipal | string } principal the principal receiving grant produce permissions + * @param {string} host the host of the producer. @default - * is used * @param {RemovalPolicy} removalPolicy the removal policy to apply to the grant. @default - RETAIN is used - * @returns When MTLS is used as authentication an ACL is created using the MskAcl Custom resource to write in the topic is created and returned, - * you can use it to add dependency on it. + * @param {Authentication} customResourceAuthentication The authentication used by the Kafka API admin client to create the ACL @default - clientAuthentication (same authentication as the target producer) + * @returns The MskAcl custom resource for MTLS clientAuthentication. Nothing for IAM clientAuthentication */ public grantProduce( id: string, @@ -239,7 +262,12 @@ export class KafkaApi extends TrackedConstruct { clientAuthentication: Authentication, principal: IPrincipal | string, host?: string, - removalPolicy?: RemovalPolicy) : CustomResource | undefined { + removalPolicy?: RemovalPolicy, + customResourceAuthentication?: Authentication, + ) : CustomResource | undefined { + + + let authentication = customResourceAuthentication == undefined ? clientAuthentication: customResourceAuthentication; if (clientAuthentication === Authentication.IAM) { @@ -272,24 +300,23 @@ export class KafkaApi extends TrackedConstruct { permissionType: AclPermissionTypes.ALLOW, }, Context.revertRemovalPolicy(this, removalPolicy), + authentication, ); return cr; } - - } /** - * Grant a principal the right to consume data from a topic - * @param {string} id the CDK resource id - * @param {string} topicName the topic to which the principal can produce data - * @param {Authentitcation} clientAuthentication The client authentication to use when grant on resource - * @param {IPrincipal | string } principal the IAM principal to grand the produce to - * @param {string} host the host to which the principal can produce data. @default - * is used + * Grant a principal permissions to consume from a topic + * @param {string} id the CDK resource ID + * @param {string} topicName the target topic to grant consume permissions on + * @param {Authentication} clientAuthentication The authentication mode of the consumer + * @param {IPrincipal | string } principal the principal receiveing grant consume permissions + * @param {string} host the host of the consumer. @default - * is used * @param {RemovalPolicy} removalPolicy the removal policy to apply to the grant. @default - RETAIN is used - * @returns When MTLS is used as authentication an ACL is created using the MskAcl Custom resource to read from the topic is created and returned, - * you can use it to add dependency on it. + * @param {Authentication} customResourceAuthentication The authentication used by the Kafka API admin client to create the ACL @default - clientAuthentication (same authentication as the target producer) + * @returns The MskAcl custom resource for MTLS clientAuthentication. Nothing for IAM clientAuthentication */ public grantConsume( id: string, @@ -297,7 +324,11 @@ export class KafkaApi extends TrackedConstruct { clientAuthentication: Authentication, principal: IPrincipal | string, host?: string, - removalPolicy?: RemovalPolicy) : CustomResource | undefined { + removalPolicy?: RemovalPolicy, + customResourceAuthentication?: Authentication, + ) : CustomResource | undefined { + + let authentication = customResourceAuthentication == undefined ? clientAuthentication: customResourceAuthentication; if (clientAuthentication === Authentication.IAM) { @@ -331,6 +362,7 @@ export class KafkaApi extends TrackedConstruct { permissionType: AclPermissionTypes.ALLOW, }, removalPolicy ?? RemovalPolicy.DESTROY, + authentication, ); return cr; diff --git a/framework/src/streaming/lib/msk/msk-helpers.ts b/framework/src/streaming/lib/msk/msk-helpers.ts index a600f16e5..c77806f9d 100644 --- a/framework/src/streaming/lib/msk/msk-helpers.ts +++ b/framework/src/streaming/lib/msk/msk-helpers.ts @@ -64,6 +64,8 @@ export function mskIamCrudProviderSetup( actions: [ 'kafka-cluster:Connect', 'kafka:GetBootstrapBrokers', + 'kafka:DescribeCluster', + 'kafka-cluster:AlterCluster', ], resources: [ clusterArn, @@ -81,6 +83,15 @@ export function mskIamCrudProviderSetup( `arn:${partition}:kafka:${region}:${account}:topic/${clusterNameUuid}/*`, ], }), + new PolicyStatement({ + actions: [ + 'kafka-cluster:AlterGroup', + 'kafka-cluster:DescribeGroup', + ], + resources: [ + `arn:${partition}:kafka:${region}:${account}:group/${clusterNameUuid}/*`, + ], + }), ]; //Attach policy to IAM Role diff --git a/framework/src/streaming/lib/msk/msk-provisioned.ts b/framework/src/streaming/lib/msk/msk-provisioned.ts index 25d0ac462..9e3cac93e 100644 --- a/framework/src/streaming/lib/msk/msk-provisioned.ts +++ b/framework/src/streaming/lib/msk/msk-provisioned.ts @@ -193,7 +193,7 @@ export class MskProvisioned extends TrackedConstruct { private readonly placeClusterHandlerInVpc?: boolean; /** - * Constructs a new instance of the EmrEksCluster construct. + * Constructs a new instance of the MSK Provisioned cluster construct. * @param {Construct} scope the Scope of the CDK Construct * @param {string} id the ID of the CDK Construct * @param {MskServerlessProps} props @@ -635,23 +635,27 @@ export class MskProvisioned extends TrackedConstruct { /** - * Creates a topic in the Msk Cluster - * @param {string} id the CDK id for Topic + * Creates ACL in the Msk Cluster + * @param {string} id the CDK ID of the ACL * @param {Acl} aclDefinition the Kafka Acl definition * @param {RemovalPolicy} removalPolicy Wether to keep the ACL or delete it when removing the resource from the Stack {@default RemovalPolicy.RETAIN} - * @returns {CustomResource} The MskAcl custom resource created + * @param {Authentication} clientAuthentication The authentication used by the Kafka API admin client to create the ACL @default - Authentication.MTLS + * @returns {CustomResource} The MskAcl custom resource created by the Kafka API admin client */ public setAcl( id: string, aclDefinition: Acl, removalPolicy?: RemovalPolicy, + clientAuthentication?: Authentication, ): CustomResource { - if (!this.inClusterAcl) { - throw Error('Setting ACLs is only supported with TLS and SASL/SCRAM'); - } + let customResourceAuthentication = clientAuthentication ?? Authentication.MTLS; - const cr = this.kafkaApi.setAcl(id, aclDefinition, removalPolicy); + const cr = this.kafkaApi.setAcl(id, + aclDefinition, + removalPolicy, + customResourceAuthentication, + ); if (aclDefinition.principal !== this.crPrincipal && this.inClusterAcl) { cr.node.addDependency(this.aclOperationCr!); @@ -664,14 +668,13 @@ export class MskProvisioned extends TrackedConstruct { /** * Creates a topic in the Msk Cluster - * - * @param {string} id the CDK id for Topic - * @param {Authentication} clientAuthentication The client authentication to use when creating the Topic + * @param {string} id the CDK ID of the Topic + * @param {Authentication} clientAuthentication The authentication used by the Kafka API admin client to create the topic * @param {MskTopic} topicDefinition the Kafka topic definition * @param {RemovalPolicy} removalPolicy Wether to keep the topic or delete it when removing the resource from the Stack {@default RemovalPolicy.RETAIN} * @param {boolean} waitForLeaders If this is true it will wait until metadata for the new topics doesn't throw LEADER_NOT_AVAILABLE * @param {number} timeout The time in ms to wait for a topic to be completely created on the controller node @default 5000 - * @returns {CustomResource} The MskTopic custom resource created + * @returns {CustomResource} The MskTopic custom resource created by the Kafka API admin client */ public setTopic( id: string, @@ -702,15 +705,15 @@ export class MskProvisioned extends TrackedConstruct { } /** - * Grant a principal to produce data to a topic - * @param {string} id the CDK resource id - * @param {string} topicName the topic to which the principal can produce data - * @param {Authentitcation} clientAuthentication The client authentication to use when grant on resource - * @param {IPrincipal | string } principal the IAM principal to grand the produce to - * @param {string} host the host to which the principal can produce data. - * @param {RemovalPolicy} removalPolicy - * @returns When MTLS is used as authentication an ACL is created using the MskAcl Custom resource to write in the topic is created and returned, - * you can use it to add dependency on it. + * Grant a principal permissions to produce to a topic + * @param {string} id the CDK resource ID + * @param {string} topicName the target topic to grant produce permissions on + * @param {Authentication} clientAuthentication The authentication mode of the producer + * @param {IPrincipal | string } principal the principal receiving grant produce permissions + * @param {string} host the host of the producer. + * @param {RemovalPolicy} removalPolicy the removal policy to apply to the grant. @default - RETAIN is used + * @param {Authentication} customResourceAuthentication The authentication used by the Kafka API admin client to create the ACL @default - clientAuthentication (same authentication as the target producer) + * @returns The MskAcl custom resource for MTLS clientAuthentication. Nothing for IAM clientAuthentication */ public grantProduce( id: string, @@ -718,7 +721,11 @@ export class MskProvisioned extends TrackedConstruct { clientAuthentication: Authentication, principal: IPrincipal | string, host?: string, - removalPolicy?: RemovalPolicy): CustomResource | undefined { + removalPolicy?: RemovalPolicy, + customResourceAuthentication?: Authentication, + ): CustomResource | undefined { + + let authentication = customResourceAuthentication == undefined ? clientAuthentication: customResourceAuthentication; const cr = this.kafkaApi.grantProduce( id, @@ -727,6 +734,7 @@ export class MskProvisioned extends TrackedConstruct { principal, host, removalPolicy, + authentication, ); if (this.inClusterAcl && cr) { @@ -739,16 +747,15 @@ export class MskProvisioned extends TrackedConstruct { } /** - * Grant a principal the right to consume data from a topic - * - * @param {string} id the CDK resource id - * @param {string} topicName the topic to which the principal can produce data - * @param {Authentitcation} clientAuthentication The client authentication to use when grant on resource - * @param {IPrincipal | string } principal the IAM principal to grand the produce to - * @param {string} host the host to which the principal can produce data. - * @param {RemovalPolicy} removalPolicy - * @returns When MTLS is used as authentication, an ACL is created using the MskAcl Custom resource to read from the topic is created and returned, - * you can use it to add dependency on it. + * Grant a principal permissions to consume from a topic + * @param {string} id the CDK resource ID + * @param {string} topicName the target topic to grant consume permissions on + * @param {Authentication} clientAuthentication The authentication mode of the consumer + * @param {IPrincipal | string } principal the principal receiveing grant consume permissions + * @param {string} host the host of the consumer. @default - * is used + * @param {RemovalPolicy} removalPolicy the removal policy to apply to the grant. @default - RETAIN is used + * @param {Authentication} customResourceAuthentication The authentication used by the Kafka API admin client to create the ACL @default - clientAuthentication (same authentication as the target producer) + * @returns The MskAcl custom resource for MTLS clientAuthentication. Nothing for IAM clientAuthentication */ public grantConsume( id: string, @@ -756,7 +763,11 @@ export class MskProvisioned extends TrackedConstruct { clientAuthentication: Authentication, principal: IPrincipal | string, host?: string, - removalPolicy?: RemovalPolicy): CustomResource | undefined { + removalPolicy?: RemovalPolicy, + customResourceAuthentication?: Authentication, + ): CustomResource | undefined { + + let authentication = customResourceAuthentication == undefined ? clientAuthentication: customResourceAuthentication; const cr = this.kafkaApi.grantConsume( id, @@ -764,7 +775,9 @@ export class MskProvisioned extends TrackedConstruct { clientAuthentication, principal, host, - removalPolicy); + removalPolicy, + authentication, + ); if (this.inClusterAcl && cr) { cr.node.addDependency(this.aclOperationCr!); @@ -778,7 +791,7 @@ export class MskProvisioned extends TrackedConstruct { public putClusterPolicy(policy: string, id: string, currentVersion?: string) { if (!this.clusterVpcConnectivity) { - throw Error('PutClusterPolicy is Vpc Connectiviy is not setup'); + throw Error('Vpc Connectivity is not setup'); } // eslint-disable-next-line local-rules/no-tokens-in-construct-id @@ -808,7 +821,6 @@ export class MskProvisioned extends TrackedConstruct { throw Error('PutClusterPolicy is Vpc Connectiviy is not setup'); } - // eslint-disable-next-line local-rules/no-tokens-in-construct-id let clusterBootstrapBrokers = new AwsCustomResource(this, 'DeleteClusterPolicy', { onUpdate: { service: 'Kafka', @@ -827,7 +839,8 @@ export class MskProvisioned extends TrackedConstruct { } - private setAcls(props: MskProvisionedProps): CustomResource[] { + private setAcls(props: MskProvisionedProps, + ): CustomResource[] { let aclsResources: CustomResource[] = []; @@ -991,4 +1004,4 @@ export class MskProvisioned extends TrackedConstruct { return clusterBootstrapBrokers.getResponseField(responseField!); } -} +} \ No newline at end of file diff --git a/framework/src/streaming/lib/msk/msk-utils.ts b/framework/src/streaming/lib/msk/msk-utils.ts index 5fba8fc2d..d250013d2 100644 --- a/framework/src/streaming/lib/msk/msk-utils.ts +++ b/framework/src/streaming/lib/msk/msk-utils.ts @@ -121,6 +121,11 @@ export class KafkaVersion { */ public static readonly V3_5_1 = KafkaVersion.of('3.5.1'); + /** + * Kafka version 3.6.0 + */ + public static readonly V3_6_0 = KafkaVersion.of('3.6.0'); + /** * Custom cluster version * @param version custom version number @@ -392,7 +397,7 @@ export interface SaslAuthProps { /** * Enable IAM access control. - * @default true + * @default - false */ readonly iam?: boolean; diff --git a/framework/src/streaming/lib/msk/resources/lambdas/crudIam/acl-crud.mjs b/framework/src/streaming/lib/msk/resources/lambdas/crudIam/acl-crud.mjs new file mode 100644 index 000000000..557b1379d --- /dev/null +++ b/framework/src/streaming/lib/msk/resources/lambdas/crudIam/acl-crud.mjs @@ -0,0 +1,103 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +export async function aclCrudOnEvent (event, admin) { + switch (event.RequestType) { + case 'Create': + + try { + + const acl = [ + { + resourceType: parseInt(event.ResourceProperties.resourceType), + resourcePatternType: parseInt(event.ResourceProperties.resourcePatternType), + resourceName: event.ResourceProperties.resourceName, + principal: event.ResourceProperties.principal, + host: event.ResourceProperties.host, + operation: parseInt(event.ResourceProperties.operation), + permissionType: parseInt(event.ResourceProperties.permissionType), + } + ]; + + await admin.createAcls({ acl }); + + await admin.disconnect(); + + break; + + } + catch (error) { + await admin.disconnect(); + + throw new Error(`Error applying ACL: ${event.ResourceProperties}. Error ${JSON.stringify(error)}`); + } + + case 'Update': + + try { + + const acl = [ + { + resourceType: parseInt(event.ResourceProperties.resourceType), + resourcePatternType: parseInt(event.ResourceProperties.resourcePatternType), + resourceName: event.ResourceProperties.resourceName, + principal: event.ResourceProperties.principal, + host: event.ResourceProperties.host, + operation: parseInt(event.ResourceProperties.operation), + permissionType: parseInt(event.ResourceProperties.permissionType), + } + ]; + + console.log(acl); + + await admin.createAcls({ acl }); + + await admin.disconnect(); + + break; + } + catch (error) { + await admin.disconnect(); + + throw new Error(`Error updating ACL: ${event.ResourceProperties}. Error ${JSON.stringify(error)}`); + } + + case 'Delete': + + try { + + const acl = [ + { + resourceType: parseInt(event.ResourceProperties.resourceType), + resourcePatternType: parseInt(event.ResourceProperties.resourcePatternType), + resourceName: event.ResourceProperties.resourceName, + principal: event.ResourceProperties.principal, + host: event.ResourceProperties.host, + operation: parseInt(event.ResourceProperties.operation), + permissionType: parseInt(event.ResourceProperties.permissionType), + } + ]; + + console.log(acl); + + let kafkaResponse = await admin.deleteAcls({ filters: acl }) + + let errorCode = kafkaResponse.filterResponses[0].errorCode; + + await admin.disconnect(); + return { + "Data": { + "kafkaResponse": errorCode == 0 ? true : false, + } + }; + } + catch (error) { + await admin.disconnect(); + + throw new Error(`Error deleting ACL: ${event.ResourceProperties}. Error ${JSON.stringify(error)}`); + } + + default: + throw new Error(`invalid request type: ${event.RequestType}`); + } +} \ No newline at end of file diff --git a/framework/src/streaming/lib/msk/resources/lambdas/crudIam/index.mjs b/framework/src/streaming/lib/msk/resources/lambdas/crudIam/index.mjs index e3b93dee0..82d058894 100644 --- a/framework/src/streaming/lib/msk/resources/lambdas/crudIam/index.mjs +++ b/framework/src/streaming/lib/msk/resources/lambdas/crudIam/index.mjs @@ -4,6 +4,8 @@ import { Kafka, logLevel } from "kafkajs"; import { generateAuthToken } from "aws-msk-iam-sasl-signer-js"; import { KafkaClient, GetBootstrapBrokersCommand } from "@aws-sdk/client-kafka"; +import { aclCrudOnEvent } from "./acl-crud.mjs"; +import { topicCrudOnEvent } from "./topic-crud.mjs"; async function oauthBearerTokenProvider(region) { // Uses AWS Default Credentials Provider Chain to fetch credentials @@ -36,6 +38,8 @@ export const onEventHandler = async (event) => { console.log("Unknown Log Level"); throw new Error(`invalid log level: ${event.ResourceProperties.logLevel}`); } + + console.log('loglevel: '+ level); const client = new KafkaClient(); const input = { @@ -66,125 +70,20 @@ export const onEventHandler = async (event) => { console.info('======Received Event======='); console.info(event); - switch (event.RequestType) { - case 'Create': - - console.log(event.ResourceProperties.topic); - - try { - const result = await admin.createTopics({ - validateOnly: false, - waitForLeaders: event.ResourceProperties.waitForLeaders, - timeout: event.ResourceProperties.timeout, - topics: [event.ResourceProperties.topic], - }); - - console.log(`Topic created: ${result}`); - if ( result == false ) { - throw new Error(`Error creating topic: ${event.ResourceProperties.topic}`); - } - break; - - } - catch (error) { - console.log(`Error creating topic: ${JSON.stringify(error)}`); - throw new Error(`Error creating topic: ${event.ResourceProperties.topic}. Error ${JSON.stringify(error)}`); - } - - case 'Update': - - console.info(event.ResourceProperties.topic); - - const oldTopic = event.OldResourceProperties.topic; - const newTopic = event.ResourceProperties.topic; - - if ( newTopic.numPartitions > oldTopic.numPartitions ) { - console.log("creating new partitions...") - try { - - const result = await admin.createPartitions({ - validateOnly: false, - timeout: event.ResourceProperties.timeout, - topicPartitions: [{ - topic: newTopic.topic, - count: newTopic.numPartitions, - assignments: undefined - }], - }); - - console.log(`Topic partition count updated: ${result}`); - } - catch (error) { - console.log(`Error updating topic number of partitions: ${JSON.stringify(error)}`); - throw new Error(`Error updating topic number of partitions: ${event.ResourceProperties.topic}. Error ${JSON.stringify(error)}`); - } - } else if ( newTopic.numPartitions < oldTopic.numPartitions ) { - throw new Error(`Error updating topics: number of partitions can't be decreased`); - } - - // if (newTopic.replicationFactor > oldTopic.replicationFactor || newTopic.replicaAssignment !== oldTopic.replicaAssignment) { - // console.log(`Error updating topics: replication factor update is not supported`); - // } - - // if (newTopic.configEntries !== oldTopic.configEntries) { - // console.log(`Error updating topics: configuration entries update is not supported`); - // } - - // if (mskClusterType === 'PROVISIONED') { - // if (newTopic.replicationFactor > oldTopic.replicationFactor || newTopic.replicaAssignment !== oldTopic.replicaAssignment) { - // if (newTopic.replicaAssignment === oldTopic.replicaAssignment) { - // throw new Error(`Error updating topics: replication can only be increased by providing replicas assignment`); - // } else { - // console.log("updating partitions assignment...") - // try { - // const result = await admin.alterPartitionReassignments({ - // validateOnly: false, - // timeout: event.ResourceProperties.timeout, - // topics: [{ - // topic: newTopic.topic, - // partitionAssignment: newTopic.replicaAssignment, - // }], - // }); - - // console.log(`Topic replication factor updated: ${result}`); - // await admin.disconnect(); - // break; - // } - // catch (error) { - // await admin.disconnect(); - // console.log(`Error updating topic replication factor: ${JSON.stringify(error)}`); - // throw new Error(`Error updating topic replication factor: ${event.ResourceProperties.topic}. Error ${JSON.stringify(error)}`); - // } - // } - // } - // } - - case 'Delete': - - console.log(event.ResourceProperties.topic); - - try { - const result = await admin.deleteTopics({ - timeout: event.ResourceProperties.timeout, - topics: [event.ResourceProperties.topic.topic], - }); - - console.log(`Topic deleted: ${result}`); - break; - } - catch (error) { - console.log(`Error deleting topic: ${error.errorMessage}`); - console.log(`Error deleting topic: ${error.message}`); - if (error.errorMessage.includes('topics is not defined')) { - console.log('Topic is not defined, skipping...'); - break; - } - console.log(`Error deleting topic: ${JSON.stringify(error)}`); - throw new Error(`Error deleting topics: ${event.ResourceProperties.topic}. Error ${JSON.stringify(error)}`); - - } - + switch(event.ResourceType) { + case "Custom::MskAcl": + console.log("Event for ACL received"); + const responseAcl = await aclCrudOnEvent(event, admin); + console.log(responseAcl); + break; + case "Custom::MskTopic": + console.log("Event for Topic received"); + const responseTopic = await topicCrudOnEvent(event, admin); + console.log(responseTopic); + break; default: - throw new Error(`invalid request type: ${event.RequestType}`); + console.log("Unknown Resource Type"); + throw new Error(`invalid resource type: ${event.ResourceType}`); } + } \ No newline at end of file diff --git a/framework/src/streaming/lib/msk/resources/lambdas/crudIam/topic-crud.mjs b/framework/src/streaming/lib/msk/resources/lambdas/crudIam/topic-crud.mjs new file mode 100644 index 000000000..856e59817 --- /dev/null +++ b/framework/src/streaming/lib/msk/resources/lambdas/crudIam/topic-crud.mjs @@ -0,0 +1,183 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +import { ConfigResourceTypes } from "kafkajs" + +function getDifferentConfigEntries(oldEntries, newEntries) { + if (!oldEntries) { + return newEntries; + } + + const differentEntries = []; + + for (const newEntry of newEntries) { + const oldEntry = oldEntries.find(entry => entry.name === newEntry.name); + if (!oldEntry || oldEntry.value !== newEntry.value) { + differentEntries.push(newEntry); + } + } + + for (const oldEntry of oldEntries) { + const newEntry = newEntries.find(entry => entry.name === oldEntry.name); + if (!newEntry) { + differentEntries.push(oldEntry); + } + } + + return differentEntries; +} + + + +export async function topicCrudOnEvent (event, admin) { + switch (event.RequestType) { + case 'Create': + + console.log(event.ResourceProperties.topic); + + try { + + const result = await admin.createTopics({ + validateOnly: false, + waitForLeaders: event.ResourceProperties.waitForLeaders, + timeout: event.ResourceProperties.timeout, + topics: [event.ResourceProperties.topic], + }); + + console.log(`Topic created: ${result}`); + if ( result == false ) { + throw new Error(`Error creating topic: ${event.ResourceProperties.topic}`); + } + break; + + } + catch (error) { + console.log(`Error creating topic: ${JSON.stringify(error)}`); + throw new Error(`Error creating topic: ${event.ResourceProperties.topic}. Error ${JSON.stringify(error)}`); + } + + case 'Update': + + console.info(event.ResourceProperties.topic); + + const oldTopic = event.OldResourceProperties.topic; + const newTopic = event.ResourceProperties.topic; + let updatedConfigEntries = []; + + // We need to find the attributes that were changed in the update + if(newTopic.configEntries || oldTopic.configEntries) { + updatedConfigEntries = getDifferentConfigEntries(oldTopic.configEntries, newTopic.configEntries); + } + + console.log(updatedConfigEntries); + + console.log("updating topic..."); + + console.log({ + type: ConfigResourceTypes.TOPIC, + name: newTopic.topic, + configEntries: updatedConfigEntries + }); + + if(updatedConfigEntries) { + await admin.alterConfigs({ + resources: [{ + type: ConfigResourceTypes.TOPIC, + name: newTopic.topic, + configEntries: updatedConfigEntries + }] + }); + + } + + if ( newTopic.numPartitions > oldTopic.numPartitions ) { + console.log("creating new partitions...") + try { + + const result = await admin.createPartitions({ + validateOnly: false, + timeout: event.ResourceProperties.timeout, + topicPartitions: [{ + topic: newTopic.topic, + count: newTopic.numPartitions, + assignments: undefined + }], + }); + + console.log(`Topic partition count updated: ${result}`); + + } + catch (error) { + console.log(`Error updating topic number of partitions: ${JSON.stringify(error)}`); + throw new Error(`Error updating topic number of partitions: ${event.ResourceProperties.topic}. Error ${JSON.stringify(error)}`); + } + } else if ( newTopic.numPartitions < oldTopic.numPartitions ) { + throw new Error(`Error updating topics: number of partitions can't be decreased`); + } + + // if (newTopic.replicationFactor > oldTopic.replicationFactor || newTopic.replicaAssignment !== oldTopic.replicaAssignment) { + // console.log(`Error updating topics: replication factor update is not supported`); + // } + + // if (newTopic.configEntries !== oldTopic.configEntries) { + // console.log(`Error updating topics: configuration entries update is not supported`); + // } + + // if (newTopic.replicationFactor > oldTopic.replicationFactor || newTopic.replicaAssignment !== oldTopic.replicaAssignment) { + // if (newTopic.replicaAssignment === oldTopic.replicaAssignment) { + // throw new Error(`Error updating topics: replication can only be increased by providing replicas assignment`); + // } else { + // console.log("updating partitions assignment...") + // try { + // const result = await admin.alterPartitionReassignments({ + // validateOnly: false, + // timeout: event.ResourceProperties.timeout, + // topics: [{ + // topic: newTopic.topic, + // partitionAssignment: newTopic.replicaAssignment, + // }], + // }); + + // console.log(`Topic replication factor updated: ${result}`); + // await admin.disconnect(); + // break; + // } + // catch (error) { + // await admin.disconnect(); + // console.log(`Error updating topic replication factor: ${JSON.stringify(error)}`); + // throw new Error(`Error updating topic replication factor: ${event.ResourceProperties.topic}. Error ${JSON.stringify(error)}`); + // } + // } + // } + + await admin.disconnect(); + break; + + case 'Delete': + + console.log(event.ResourceProperties.topic); + + try { + const result = await admin.deleteTopics({ + timeout: event.ResourceProperties.timeout, + topics: [event.ResourceProperties.topic.topic], + }); + + console.log(`Topic deleted: ${result}`); + break; + } + catch (error) { + console.log(`Error deleting topic: ${error.errorMessage}`); + console.log(`Error deleting topic: ${error.message}`); + if (error.errorMessage.includes('topics is not defined')) { + console.log('Topic is not defined, skipping...'); + break; + } + console.log(`Error deleting topic: ${JSON.stringify(error)}`); + throw new Error(`Error deleting topics: ${event.ResourceProperties.topic}. Error ${JSON.stringify(error)}`); + + } + + default: + throw new Error(`invalid request type: ${event.RequestType}`); + } +} \ No newline at end of file diff --git a/framework/src/streaming/lib/msk/resources/lambdas/tlsCrudAdminClient/index.mjs b/framework/src/streaming/lib/msk/resources/lambdas/tlsCrudAdminClient/index.mjs index 0c8470cbc..fa623d499 100644 --- a/framework/src/streaming/lib/msk/resources/lambdas/tlsCrudAdminClient/index.mjs +++ b/framework/src/streaming/lib/msk/resources/lambdas/tlsCrudAdminClient/index.mjs @@ -98,6 +98,7 @@ export const onEventHandler = async (event) => { const admin = kafka.admin(); console.info('======Received Event======='); + console.info(event); // If the principal is set to REPLACE-WITH-BOOTSTRAP, // we need to replace it with the broker FQDN prefix with a wildcard diff --git a/framework/test/e2e/msk-provisioned-tls.e2e.test.ts b/framework/test/e2e/msk-provisioned-tls.e2e.test.ts index 26b19e97c..1f64421fc 100644 --- a/framework/test/e2e/msk-provisioned-tls.e2e.test.ts +++ b/framework/test/e2e/msk-provisioned-tls.e2e.test.ts @@ -9,17 +9,19 @@ import * as cdk from 'aws-cdk-lib'; import { CertificateAuthority } from 'aws-cdk-lib/aws-acmpca'; +import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam'; import { Secret } from 'aws-cdk-lib/aws-secretsmanager'; import { TestStack } from './test-stack'; -import { Authentication, ClientAuthentication, MSK_DEFAULT_VERSION, MskProvisioned } from '../../src/streaming/lib/msk'; +import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, Authentication, ClientAuthentication, MSK_DEFAULT_VERSION, MskProvisioned, ResourcePatternTypes } from '../../src/streaming/lib/msk'; import { Utils } from '../../src/utils'; jest.setTimeout(10000000); // GIVEN const app = new cdk.App(); -const testStack = new TestStack('MskProvisionedTlsTestStack', app); -const { stack } = testStack; +const stack = new cdk.Stack(app, 'MskProvisionedTlsTestStack'); +const testStack = new TestStack('MskProvisionedTlsTestStack', app, stack); +// const { stack } = testStack; stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true); @@ -33,6 +35,7 @@ const msk = new MskProvisioned(stack, 'cluster', { clientAuthentication: ClientAuthentication.saslTls( { certificateAuthorities: [certificateAuthority], + iam: true, }, ), kafkaVersion: MSK_DEFAULT_VERSION, @@ -44,8 +47,14 @@ const msk = new MskProvisioned(stack, 'cluster', { }, }); -msk.setTopic('topicProvisioned', Authentication.MTLS, { - topic: 'provisioned', +msk.setTopic('topicProvisionedMtls', Authentication.MTLS, { + topic: 'provisionedMtls', + numPartitions: 1, + replicationFactor: 1, +}, cdk.RemovalPolicy.DESTROY, false, 1500); + +msk.setTopic('topicProvisionedIam', Authentication.IAM, { + topic: 'provisionedIam', numPartitions: 1, replicationFactor: 1, }, cdk.RemovalPolicy.DESTROY, false, 1500); @@ -66,6 +75,110 @@ msk.setTopic('topicConfigentries', Authentication.MTLS, { ], }, cdk.RemovalPolicy.DESTROY, false, 1500); +msk.setAcl('AclMtls', + { + resourceType: AclResourceTypes.TOPIC, + resourceName: 'provisionedIam', + resourcePatternType: ResourcePatternTypes.LITERAL, + principal: 'User:Cn=Toto', + host: '*', + operation: AclOperationTypes.CREATE, + permissionType: AclPermissionTypes.ALLOW, + }, + cdk.RemovalPolicy.DESTROY, + Authentication.MTLS, +); + +msk.setAcl('AclIam', + { + resourceType: AclResourceTypes.TOPIC, + resourceName: 'provisionedMtls', + resourcePatternType: ResourcePatternTypes.LITERAL, + principal: 'User:Cn=Toto', + host: '*', + operation: AclOperationTypes.CREATE, + permissionType: AclPermissionTypes.ALLOW, + }, + cdk.RemovalPolicy.DESTROY, + Authentication.IAM, +); + +const kafkaClientRole = new Role(stack, 'producerRole', { + assumedBy: new ServicePrincipal('ec2.amazonaws.com'), +}); + +msk.grantProduce('grantIamProduceMtls', + 'provisionedIam', + Authentication.IAM, + kafkaClientRole, + '*', + cdk.RemovalPolicy.DESTROY, + Authentication.MTLS, +); + +msk.grantConsume('grantIamConsumeMtls', + 'provisionedMtls', + Authentication.IAM, + kafkaClientRole, + '*', + cdk.RemovalPolicy.DESTROY, + Authentication.MTLS, +); + +msk.grantProduce('grantIamProduceIam', + 'provisionedIam', + Authentication.IAM, + kafkaClientRole, + '*', + cdk.RemovalPolicy.DESTROY, + Authentication.IAM, +); + +msk.grantConsume('grantIamConsumeIam', + 'provisionedMtls', + Authentication.IAM, + kafkaClientRole, + '*', + cdk.RemovalPolicy.DESTROY, + Authentication.IAM, +); + +msk.grantProduce('grantMtlsProduceMtls', + 'provisionedIam', + Authentication.MTLS, + 'User:Cn=Toto', + '*', + cdk.RemovalPolicy.DESTROY, + Authentication.MTLS, +); + +msk.grantConsume('grantMtlsConsumeMtls', + 'provisionedMtls', + Authentication.MTLS, + 'User:Cn=Toto', + '*', + cdk.RemovalPolicy.DESTROY, + Authentication.MTLS, +); + +msk.grantProduce('grantMtlsProduceIam', + 'provisionedIam', + Authentication.MTLS, + 'User:Cn=Toto', + '*', + cdk.RemovalPolicy.DESTROY, + Authentication.IAM, +); + +msk.grantConsume('grantMtlsConsumeIam', + 'provisionedMtls', + Authentication.MTLS, + 'User:Cn=Toto', + '*', + cdk.RemovalPolicy.DESTROY, + Authentication.IAM, +); + new cdk.CfnOutput(stack, 'MskProvisionedCluster', { value: msk.cluster.attrArn, }); diff --git a/framework/test/unit/streaming/kafka-api.test.ts b/framework/test/unit/streaming/kafka-api.test.ts index 059a1757e..62bea8b4b 100644 --- a/framework/test/unit/streaming/kafka-api.test.ts +++ b/framework/test/unit/streaming/kafka-api.test.ts @@ -164,6 +164,8 @@ describe('Using default KafkaApi configuration with MSK provisioned and IAM and Action: [ 'kafka-cluster:Connect', 'kafka:GetBootstrapBrokers', + 'kafka:DescribeCluster', + 'kafka-cluster:AlterCluster', ], Effect: 'Allow', Resource: { @@ -296,6 +298,126 @@ describe('Using default KafkaApi configuration with MSK provisioned and IAM and ], }, }, + { + Action: [ + 'kafka-cluster:AlterGroup', + 'kafka-cluster:DescribeGroup', + ], + Effect: 'Allow', + Resource: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + 'Fn::Select': [ + 1, + { + 'Fn::Split': [ + ':', + { + 'Fn::GetAtt': [ + 'MyCluster', + 'Arn', + ], + }, + ], + }, + ], + }, + ':kafka:', + { + 'Fn::Select': [ + 3, + { + 'Fn::Split': [ + ':', + { + 'Fn::GetAtt': [ + 'MyCluster', + 'Arn', + ], + }, + ], + }, + ], + }, + ':', + { + 'Fn::Select': [ + 4, + { + 'Fn::Split': [ + ':', + { + 'Fn::GetAtt': [ + 'MyCluster', + 'Arn', + ], + }, + ], + }, + ], + }, + ':group/', + { + 'Fn::Select': [ + 1, + { + 'Fn::Split': [ + '/', + { + 'Fn::Select': [ + 5, + { + 'Fn::Split': [ + ':', + { + 'Fn::GetAtt': [ + 'MyCluster', + 'Arn', + ], + }, + ], + }, + ], + }, + ], + }, + ], + }, + '/', + { + 'Fn::Select': [ + 2, + { + 'Fn::Split': [ + '/', + { + 'Fn::Select': [ + 5, + { + 'Fn::Split': [ + ':', + { + 'Fn::GetAtt': [ + 'MyCluster', + 'Arn', + ], + }, + ], + }, + ], + }, + ], + }, + ], + }, + '/*', + ], + ], + }, + }, ], }, }); diff --git a/framework/test/unit/streaming/msk-provisioned.test.ts b/framework/test/unit/streaming/msk-provisioned.test.ts index ac9738813..00ebb6d6b 100644 --- a/framework/test/unit/streaming/msk-provisioned.test.ts +++ b/framework/test/unit/streaming/msk-provisioned.test.ts @@ -50,8 +50,9 @@ describe('Create an MSK Provisioned cluster with a provided vpc and add topic as operation: AclOperationTypes.CREATE, permissionType: AclPermissionTypes.ALLOW, }, - RemovalPolicy.DESTROY); - }).toThrow('Setting ACLs is only supported with TLS and SASL/SCRAM'); + RemovalPolicy.DESTROY, + Authentication.MTLS); + }).toThrow('MTLS Authentication is not supported for this cluster'); const template = Template.fromStack(stack, {}); @@ -180,7 +181,6 @@ describe('Create an MSK Provisioned cluster with mTlS auth, provided vpc and add permissionType: AclPermissionTypes.ALLOW, }, RemovalPolicy.DESTROY); - const template = Template.fromStack(stack, {}); test('MSK Porivisioned is created', () => { diff --git a/website/docs/constructs/cookbook/contributing.md b/website/docs/constructs/cookbook/contributing.md index 45a5cdfd9..702212300 100644 --- a/website/docs/constructs/cookbook/contributing.md +++ b/website/docs/constructs/cookbook/contributing.md @@ -64,7 +64,13 @@ Now you're ready to start contributing to the framework sub project! ### Testing changes 1. Validate the changes in an AWS account - 1. Use the following code in a file in the e2e test folder and update corresponding values. **DO NOT COMMIT THIS FILE** + 1. Use the following code in a file (Ex: `mytest.e2e.test.ts`) in the `framework/test/e2e` folder and update corresponding values. + +:::warning **DO NOT COMMIT THIS FILE** + +This file used to iteratively test the construct during development should not be committed to the repository. Only standard e2e tests should be committed. + +::: ```typescript /** @@ -80,8 +86,8 @@ Now you're ready to start contributing to the framework sub project! // GIVEN const app = new cdk.App(); - const stack = new Stack(app, 'MyStack'); - const testStack = new TestStack('MskServerkessTestStack', app, stack); + const stack = new Stack(app, 'E2eStack'); + const testStack = new TestStack('E2eTestStack', app, stack); stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true); diff --git a/website/docs/constructs/library/generated/_streaming-kafka-api.mdx b/website/docs/constructs/library/generated/_streaming-kafka-api.mdx index 41ffb5fdc..3d7e169ee 100644 --- a/website/docs/constructs/library/generated/_streaming-kafka-api.mdx +++ b/website/docs/constructs/library/generated/_streaming-kafka-api.mdx @@ -129,6 +129,7 @@ The topic is defined by the property type called `MskTopic`. Below you can see t topic: , numPartitions: , // default: -1 (uses broker `num.partitions` configuration) replicationFactor: , // default: -1 (uses broker `default.replication.factor` configuration) + configEntries: // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: [] } ``` @@ -174,7 +175,7 @@ Only the number of partitions can be updated after the creation of the topic. ### setACL This method allows you to create, update or delete an ACL. Its backend uses [kafkajs](https://kafka.js.org/). -The topic is defined by the property type called `MskACL`. This method should be used only when the cluster authentication is set to `mTLS`. Below you can see the definition of the topic as well as an example of use. +The topic is defined by the property type called `MskACL`. This method can be used when the cluster authentication is set to `mTLS` or `IAM`+`mTLS`. Below you can see the definition of the ACL as well as an example of use. ```json { @@ -188,6 +189,8 @@ The topic is defined by the property type called `MskACL`. This method should be } ``` +You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in `clientAuthentication`: for mTLS use `Authentitcation.MTLS` and for IAM use `Authentitcation.IAM`. Default value is `Authentitcation.MTLS`. The example below uses mTLS as authentication. + @@ -202,7 +205,8 @@ kafkaApi.setAcl('acl', operation: AclOperationTypes.CREATE, permissionType: AclPermissionTypes.ALLOW, }, - cdk.RemovalPolicy.DESTROY + cdk.RemovalPolicy.DESTROY, + Authentication.MTLS ); ``` @@ -220,7 +224,7 @@ kafka_api.set_acl("acl", Acl( host="*", operation=AclOperationTypes.CREATE, permission_type=AclPermissionTypes.ALLOW -), cdk.RemovalPolicy.DESTROY) +), cdk.RemovalPolicy.DESTROY, Authentication.MTLS) ``` diff --git a/website/docs/constructs/library/generated/_streaming-msk-provisioned.mdx b/website/docs/constructs/library/generated/_streaming-msk-provisioned.mdx index f39927394..2052c5d6a 100644 --- a/website/docs/constructs/library/generated/_streaming-msk-provisioned.mdx +++ b/website/docs/constructs/library/generated/_streaming-msk-provisioned.mdx @@ -187,6 +187,16 @@ msk.setTopic('topic1', topic: 'topic1', numPartitions: 3, replicationFactor: 1, + configEntries: [ + { + name: 'retention.ms', + value: '90000', + }, + { + name: 'retention.bytes', + value: '90000', + }, + ], }, cdk.RemovalPolicy.DESTROY, false, 1500); ``` @@ -199,7 +209,15 @@ msk.setTopic('topic1', msk.set_topic("topic1", Authentication.IAM, MskTopic( topic="topic1", num_partitions=3, - replication_factor=1 + replication_factor=1, + config_entries=[{ + "name": "retention.ms", + "value": "90000" + }, { + "name": "retention.bytes", + "value": "90000" + } + ] ), cdk.RemovalPolicy.DESTROY, False, 1500) ``` @@ -223,6 +241,8 @@ The topic is defined by the property type called `MskACL`. This method should be } ``` +You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in `clientAuthentication`: for mTLS use `Authentitcation.MTLS` and for IAM use `Authentitcation.IAM`. Default value is `Authentitcation.MTLS`. The example below uses IAM as authentication. + @@ -236,7 +256,9 @@ msk.setAcl('acl', { operation: AclOperationTypes.CREATE, permissionType: AclPermissionTypes.ALLOW, }, - cdk.RemovalPolicy.DESTROY); + cdk.RemovalPolicy.DESTROY, + Authentication.IAM, +); ``` ```mdx-code-block @@ -253,7 +275,7 @@ msk.set_acl("acl", Acl( host="*", operation=AclOperationTypes.CREATE, permission_type=AclPermissionTypes.ALLOW -), cdk.RemovalPolicy.DESTROY) +), cdk.RemovalPolicy.DESTROY, Authentication.IAM) ``` diff --git a/website/docs/constructs/library/generated/_streaming-msk-serverless.mdx b/website/docs/constructs/library/generated/_streaming-msk-serverless.mdx index cd1503365..a86b4c424 100644 --- a/website/docs/constructs/library/generated/_streaming-msk-serverless.mdx +++ b/website/docs/constructs/library/generated/_streaming-msk-serverless.mdx @@ -116,6 +116,16 @@ let topic: MskTopic = { topic: 'topic1', numPartitions: 3, replicationFactor: 1, + configEntries: [ + { + name: 'retention.ms', + value: '90000', + }, + { + name: 'retention.bytes', + value: '90000', + }, + ], } msk.addTopic('topic1', topic, cdk.RemovalPolicy.DESTROY, false, 1500); @@ -132,7 +142,15 @@ msk = MskServerless(stack, "cluster") topic = MskTopic( topic="topic1", num_partitions=3, - replication_factor=1 + replication_factor=1, + config_entries=[{ + "name": "retention.ms", + "value": "90000" + }, { + "name": "retention.bytes", + "value": "90000" + } + ] ) msk.add_topic("topic1", topic, cdk.RemovalPolicy.DESTROY, False, 1500)