Skip to content

Commit

Permalink
feat(kafka-api): Add IAM as Authentication method when creating ACL w…
Browse files Browse the repository at this point in the history
…ith kafka api (#655)

* Support setting ACLs in cluster with heterogeneous authentication, use IAM as Authentication method when creating ACL with kafka api
  • Loading branch information
armaseg committed Jun 12, 2024
1 parent 0d33987 commit 9735429
Show file tree
Hide file tree
Showing 21 changed files with 892 additions and 267 deletions.
169 changes: 117 additions & 52 deletions framework/API.md

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion framework/src/streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ The topic is defined by the property type called `MskACL`. This method should be
}
```

You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in `clientAuthentication`: for mTLS use `Authentitcation.MTLS` and for IAM use `Authentitcation.IAM`. Default value is `Authentitcation.MTLS`. The example below uses IAM as authentication.

[example msk provisiond setACL](./examples/msk-provisioned-set-acl.lit.ts)

### grantProduce
Expand Down Expand Up @@ -231,6 +233,7 @@ The topic is defined by the property type called `MskTopic`. Below you can see t
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
```

Expand All @@ -247,7 +250,7 @@ Only the number of partitions can be updated after the creation of the topic.
### setACL

This method allows you to create, update or delete an ACL. Its backend uses [kafkajs](https://kafka.js.org/).
The topic is defined by the property type called `MskACL`. This method should be used only when the cluster authentication is set to `mTLS`. Below you can see the definition of the topic as well as an example of use.
The topic is defined by the property type called `MskACL`. This method can be used when the cluster authentication is set to `mTLS` or `IAM`+`mTLS`. Below you can see the definition of the ACL as well as an example of use.

```json
{
Expand All @@ -261,6 +264,8 @@ The topic is defined by the property type called `MskACL`. This method should be
}
```

You can authenticate to your cluster using IAM or mTLS to create ACLs. These ACLs will be used later by a client that will authenticate to your cluster using mTLS. Dependeding on the authentication type that you would like to use to create the ACL, you need to put the right parameter in `clientAuthentication`: for mTLS use `Authentitcation.MTLS` and for IAM use `Authentitcation.IAM`. Default value is `Authentitcation.MTLS`. The example below uses mTLS as authentication.

[example msk provisiond setACL](./examples/kafka-api-set-acl.lit.ts)

### grantProduce
Expand Down
5 changes: 3 additions & 2 deletions framework/src/streaming/examples/kafka-api-set-acl.lit.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as cdk from 'aws-cdk-lib';
import { SecurityGroup, Vpc } from 'aws-cdk-lib/aws-ec2';
import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, ClientAuthentication, KafkaClientLogLevel, MskClusterType, ResourcePatternTypes } from '../lib/msk';
import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, ClientAuthentication, KafkaClientLogLevel, MskClusterType, ResourcePatternTypes, Authentication } from '../lib/msk';
import { KafkaApi } from '../lib/msk/kafka-api';
import { CertificateAuthority } from 'aws-cdk-lib/aws-acmpca';
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';
Expand Down Expand Up @@ -56,6 +56,7 @@ kafkaApi.setAcl('acl',
operation: AclOperationTypes.CREATE,
permissionType: AclPermissionTypes.ALLOW,
},
cdk.RemovalPolicy.DESTROY
cdk.RemovalPolicy.DESTROY,
Authentication.MTLS
);
/// !hide
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as cdk from 'aws-cdk-lib';
import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, MskProvisioned, ResourcePatternTypes } from '../lib/msk';
import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, MskProvisioned, ResourcePatternTypes, Authentication } from '../lib/msk';


const app = new cdk.App();
Expand All @@ -21,5 +21,7 @@ msk.setAcl('acl', {
operation: AclOperationTypes.CREATE,
permissionType: AclPermissionTypes.ALLOW,
},
cdk.RemovalPolicy.DESTROY);
cdk.RemovalPolicy.DESTROY,
Authentication.IAM,
);
/// !hide
10 changes: 10 additions & 0 deletions framework/src/streaming/examples/msk-provisioned-set-topic.lit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ msk.setTopic('topic1',
topic: 'topic1',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{
name: 'retention.ms',
value: '90000',
},
{
name: 'retention.bytes',
value: '90000',
},
],
}, cdk.RemovalPolicy.DESTROY, false, 1500);
/// !hide

Expand Down
10 changes: 10 additions & 0 deletions framework/src/streaming/examples/msk-serverless-set-topic.lit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ let topic: MskTopic = {
topic: 'topic1',
numPartitions: 3,
replicationFactor: 1,
configEntries: [
{
name: 'retention.ms',
value: '90000',
},
{
name: 'retention.bytes',
value: '90000',
},
],
}

msk.addTopic('topic1', topic, cdk.RemovalPolicy.DESTROY, false, 1500);
Expand Down
100 changes: 66 additions & 34 deletions framework/src/streaming/lib/msk/kafka-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
import { Context, TrackedConstruct, TrackedConstructProps } from '../../../utils';

/**
* A construct to create an MSK Serverless cluster
* A construct to create a Kafka API admin client
* @see https://awslabs.github.io/data-solutions-framework-on-aws/
*
* @example
Expand Down Expand Up @@ -69,7 +69,7 @@ export class KafkaApi extends TrackedConstruct {
private readonly clusterType: MskClusterType;

/**
* Constructs a new instance of the EmrEksCluster construct.
* Constructs a new instance of the Kafka API construct.
* @param {Construct} scope the Scope of the CDK Construct
* @param {string} id the ID of the CDK Construct
* @param {MskServerlessProps} props
Expand Down Expand Up @@ -136,20 +136,38 @@ export class KafkaApi extends TrackedConstruct {
}

/**
* Creates a topic in the Msk Cluster
* @param {string} id the CDK id for Topic
* @param {Acl} aclDefinition the Kafka Acl definition
* Creates a ACL in the MSK Cluster
* @param {string} id the CDK ID of the ACL
* @param {Acl} aclDefinition the Kafka ACL definition
* @param {RemovalPolicy} removalPolicy Wether to keep the ACL or delete it when removing the resource from the Stack. @default - RemovalPolicy.RETAIN
* @returns {CustomResource} The MskAcl custom resource created
* @param {Authentication} clientAuthentication The authentication used by the Kafka API admin client to create the ACL @default - Authentication.MTLS
* @returns {CustomResource} The MskAcl custom resource created by the Kafka API admin client
*/
public setAcl(
id: string,
aclDefinition: Acl,
removalPolicy?: RemovalPolicy,
clientAuthentication?: Authentication,
): CustomResource {

let serviceToken: string;

let customResourceAuthentication = clientAuthentication ?? Authentication.MTLS;

if (customResourceAuthentication === Authentication.IAM) {
if (!this.mskIamServiceToken) {
throw Error('IAM Authentication is not supported for this cluster');
}
serviceToken = this.mskIamServiceToken!;
} else {
if (!this.mskAclServiceToken) {
throw Error('MTLS Authentication is not supported for this cluster');
}
serviceToken = this.mskAclServiceToken!;
}

const cr = new CustomResource(this, id, {
serviceToken: this.mskAclServiceToken!,
serviceToken: serviceToken,
properties: {
logLevel: this.kafkaClientLogLevel,
secretArn: this.tlsCertifacateSecret?.secretArn,
Expand All @@ -171,14 +189,14 @@ export class KafkaApi extends TrackedConstruct {
}

/**
* Creates a topic in the Msk Cluster
* @param {string} id the CDK id for Topic
* @param {Authentication} clientAuthentication The client authentication to use when creating the Topic
* Creates a topic in the MSK Cluster
* @param {string} id the CDK ID for Topic
* @param {Authentication} clientAuthentication The authentication used by the Kafka API admin client to create the topic
* @param {MskTopic} topicDefinition the Kafka topic definition
* @param {RemovalPolicy} removalPolicy Wether to keep the topic or delete it when removing the resource from the Stack. @default - RemovalPolicy.RETAIN
* @param {boolean} waitForLeaders If this is true it will wait until metadata for the new topics doesn't throw LEADER_NOT_AVAILABLE
* @param {boolean} waitForLeaders If set to true, waits until metadata for the new topics doesn't throw LEADER_NOT_AVAILABLE
* @param {number} timeout The time in ms to wait for a topic to be completely created on the controller node @default - 5000
* @returns {CustomResource} The MskTopic custom resource created
* @returns {CustomResource} The MskTopic custom resource created by the Kafka API admin client
*/
public setTopic(
id: string,
Expand All @@ -192,17 +210,22 @@ export class KafkaApi extends TrackedConstruct {
let region = Stack.of(this).region;

if (this.clusterType === MskClusterType.SERVERLESS && topicDefinition.replicationFactor !== undefined) {
// (topicDefinition.replicaAssignment !== undefined || topicDefinition.replicationFactor !== undefined)) {
throw new Error("The topic definition is incorrect: MSK Serverless doesn't support replication factor and replication assignments");
}

if (clientAuthentication === Authentication.IAM) {
if (!this.mskIamServiceToken) {
throw Error('IAM Authentication is not supported for this cluster');
}
serviceToken = this.mskIamServiceToken!;
} else {
if (!this.mskAclServiceToken) {
throw Error('MTLS Authentication is not supported for this cluster');
}
serviceToken = this.mskAclServiceToken!;
}

// Create custom resource with async waiter until the Amazon EMR Managed Endpoint is created
// Create custom resource with async waiter until the MSK topic is created
const cr = new CustomResource(this, id, {
serviceToken: serviceToken,
properties: {
Expand All @@ -223,23 +246,28 @@ export class KafkaApi extends TrackedConstruct {
}

/**
* Grant a principal to produce data to a topic
* @param {string} id the CDK resource id
* @param {string} topicName the topic to which the principal can produce data
* @param {Authentitcation} clientAuthentication The client authentication to use when grant on resource
* @param {IPrincipal | string } principal the IAM principal to grand the produce to
* @param {string} host the host to which the principal can produce data. @default - * is used
* Grant a principal permissions to produce to a topic
* @param {string} id the CDK resource ID
* @param {string} topicName the target topic to grant produce permissions on
* @param {Authentication} clientAuthentication The authentication mode of the producer
* @param {IPrincipal | string } principal the principal receiving grant produce permissions
* @param {string} host the host of the producer. @default - * is used
* @param {RemovalPolicy} removalPolicy the removal policy to apply to the grant. @default - RETAIN is used
* @returns When MTLS is used as authentication an ACL is created using the MskAcl Custom resource to write in the topic is created and returned,
* you can use it to add dependency on it.
* @param {Authentication} customResourceAuthentication The authentication used by the Kafka API admin client to create the ACL @default - clientAuthentication (same authentication as the target producer)
* @returns The MskAcl custom resource for MTLS clientAuthentication. Nothing for IAM clientAuthentication
*/
public grantProduce(
id: string,
topicName: string,
clientAuthentication: Authentication,
principal: IPrincipal | string,
host?: string,
removalPolicy?: RemovalPolicy) : CustomResource | undefined {
removalPolicy?: RemovalPolicy,
customResourceAuthentication?: Authentication,
) : CustomResource | undefined {


let authentication = customResourceAuthentication == undefined ? clientAuthentication: customResourceAuthentication;

if (clientAuthentication === Authentication.IAM) {

Expand Down Expand Up @@ -272,32 +300,35 @@ export class KafkaApi extends TrackedConstruct {
permissionType: AclPermissionTypes.ALLOW,
},
Context.revertRemovalPolicy(this, removalPolicy),
authentication,
);

return cr;
}


}

/**
* Grant a principal the right to consume data from a topic
* @param {string} id the CDK resource id
* @param {string} topicName the topic to which the principal can produce data
* @param {Authentitcation} clientAuthentication The client authentication to use when grant on resource
* @param {IPrincipal | string } principal the IAM principal to grand the produce to
* @param {string} host the host to which the principal can produce data. @default - * is used
* Grant a principal permissions to consume from a topic
* @param {string} id the CDK resource ID
* @param {string} topicName the target topic to grant consume permissions on
* @param {Authentication} clientAuthentication The authentication mode of the consumer
* @param {IPrincipal | string } principal the principal receiveing grant consume permissions
* @param {string} host the host of the consumer. @default - * is used
* @param {RemovalPolicy} removalPolicy the removal policy to apply to the grant. @default - RETAIN is used
* @returns When MTLS is used as authentication an ACL is created using the MskAcl Custom resource to read from the topic is created and returned,
* you can use it to add dependency on it.
* @param {Authentication} customResourceAuthentication The authentication used by the Kafka API admin client to create the ACL @default - clientAuthentication (same authentication as the target producer)
* @returns The MskAcl custom resource for MTLS clientAuthentication. Nothing for IAM clientAuthentication
*/
public grantConsume(
id: string,
topicName: string,
clientAuthentication: Authentication,
principal: IPrincipal | string,
host?: string,
removalPolicy?: RemovalPolicy) : CustomResource | undefined {
removalPolicy?: RemovalPolicy,
customResourceAuthentication?: Authentication,
) : CustomResource | undefined {

let authentication = customResourceAuthentication == undefined ? clientAuthentication: customResourceAuthentication;

if (clientAuthentication === Authentication.IAM) {

Expand Down Expand Up @@ -331,6 +362,7 @@ export class KafkaApi extends TrackedConstruct {
permissionType: AclPermissionTypes.ALLOW,
},
removalPolicy ?? RemovalPolicy.DESTROY,
authentication,
);

return cr;
Expand Down
11 changes: 11 additions & 0 deletions framework/src/streaming/lib/msk/msk-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export function mskIamCrudProviderSetup(
actions: [
'kafka-cluster:Connect',
'kafka:GetBootstrapBrokers',
'kafka:DescribeCluster',
'kafka-cluster:AlterCluster',
],
resources: [
clusterArn,
Expand All @@ -81,6 +83,15 @@ export function mskIamCrudProviderSetup(
`arn:${partition}:kafka:${region}:${account}:topic/${clusterNameUuid}/*`,
],
}),
new PolicyStatement({
actions: [
'kafka-cluster:AlterGroup',
'kafka-cluster:DescribeGroup',
],
resources: [
`arn:${partition}:kafka:${region}:${account}:group/${clusterNameUuid}/*`,
],
}),
];

//Attach policy to IAM Role
Expand Down
Loading

0 comments on commit 9735429

Please sign in to comment.