Skip to content

Commit

Permalink
feat(msk): MSK serverless and provisioned construct (#571)
Browse files Browse the repository at this point in the history
Add support to create MSK serverless as well as MSK provisioned. Users can also manage topics and ACLs for the created cluster through this construct.
  • Loading branch information
lmouhib authored May 27, 2024
1 parent 1d16f3b commit 9ef8480
Show file tree
Hide file tree
Showing 40 changed files with 9,692 additions and 3,679 deletions.
8,743 changes: 5,182 additions & 3,561 deletions framework/API.md

Large diffs are not rendered by default.

172 changes: 172 additions & 0 deletions framework/src/streaming/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,175 @@
[//]: # (streaming.msk-provisioned)
# MSK Provisioned

An MSK Provisioned cluster with helpers to manage topics, ACLs and IAM permissions.

## Overview

The construct creates an MSK Provisioned Cluster, with the latest Kafka version in MSK as default. You can change the defaults by passing your own parameters as a Resource property to construct initializer. The construct supports creating clusters with mTLS, IAM or both as authentication methods. The construct use IAM as authentication by default if none is provided. It offers methods to manage topics and ACLs. Last, it also provides methods to grant an existing principal (ie IAM Role or IAM User or CN -Common Name-) with the permission to `produce` or `consume` to/from a kafka topic. The diagram below shows the high level architecture.

![MSK Provisioned High level architecture](../../../website/static/img/msk-provisioned.png)

The construct can create a VPC on your behalf that is used to deploy MSK Provisioned cluster or you can provide your own VPC definition through the `vpcConfigs` property when you initialize the construct. The VPC that is created on your behalf has `10.0.0.0/16` CIDR range, and comes with an S3 VPC Endpoint Gateway attached to it. The construct also creates a security group that is attached to the brokers.

### Construct cluster setup

The construct sets up a dedicated security group for Zookeeper as advised in the AWS [documentation](https://docs.aws.amazon.com/msk/latest/developerguide/zookeeper-security.html#zookeeper-security-group). When authentication is set to TLS, the construct apply ACLs on the provided principal in the props defined as `certificateDefinition`. This principal is used by the custom resource to manage ACL. Last, the construct applies MSK configuration setting `allow.everyone.if.no.acl.found` to `false`. You can also provide your own MSK configuration, in this case the construct does not create one and will apply the one you passed as part of the props.

### Interacting with cluster

The construct has the following methods, you will usage examples in the new sections:

* setTopic: Perform create, update, and delete operations on Topics
* setACL: Perform create, update, and delete operations on ACL
* grantProduce: Attach an IAM policy to a principal to write to a topic
* grantConsume: Attach an IAM policy to a principal to read from a topic

Below you can find an example of creating an MSK Provisioned configuration with the default options.

[example msk provisioned default](./examples/msk-provisioned-default.lit.ts)


## Usage

### Bring Your Own VPC

The construct allows you to provide your own VPC that was created outside the CDK Stack. Below you will find an example usage.


[example msk provisioned bring your own vpc](./examples/msk-provisioned-bring-vpc.lit.ts)


### Create a cluster with mTLS authentication

The construct allows you to create a cluster with mTLS, below is a code snippet showing the configuration.

When using MSK with mTLS the constructs requires a principal that is assigned to the custom resources that manage ACLs and Topics. The certificate and private key are expected to be in a secret managed by [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html). The secret needs to be in the format defined below and stored a `JSON Key/value` and not `Plaintext` in the Secret. The construct grants the lambda that supports the Custom Resource read access to the secret as an `Identity based policy`.

```json
{
key : "-----BEGIN RSA PRIVATE KEY----- XXXXXXXXXXXXXXXXX -----END RSA PRIVATE KEY-----",

cert : "-----BEGIN CERTIFICATE----- yyyyyyyyyyyyyyyy -----END CERTIFICATE-----"
}
```

[example msk provisioned with mTLS](./examples/msk-provisioned-create-cluster-mtls.lit.ts)

### setTopic

This method allows you to create, update or delete an ACL. Its backend uses [kafkajs](https://kafka.js.org/).
The topic is defined by the property type called `MskTopic`. Below you can see the definition of the ACL as well as a usage.

```json
{
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
replicaAssignment: <Array>, // Example: [{ partition: 0, replicas: [0,1,2] }] - default: []
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
```

Dependeding on the authentication type that is set in the cluster, you need to put the right parameter in authentication, for mTLS use `Authentitcation.MTLS` and for IAM use `Authentitcation.IAM`. The example below uses IAM as authentication.

[example msk provisiond setTopic](./examples/msk-provisioned-set-topic.lit.ts)

### setACL

This method allows you to create, update or delete a topic. Its backend uses [kafkajs](https://kafka.js.org/).
The topic is defined by the property type called `MskACL`. This method should be used only when the cluster authentication is set to `mTLS`. Below you can see the definition of the topic as well as an example of use.

```json
{
resourceType: <AclResourceTypes>,
resourceName: <String>,
resourcePatternType: <ResourcePatternTypes>,
principal: <String>,
host: <String>,
operation: <AclOperationTypes>,
permissionType: <AclPermissionTypes>,
}
```

[example msk provisiond setACL](./examples/msk-provisioned-set-acl.lit.ts)

### grantProduce

This method allows to grant a `Principal` the rights to write to a kafka topic.
In case of IAM authentication the method attachs an IAM policy as defined in the [AWS documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#iam-access-control-use-cases) scoped only to the topic provided. For mTLS authentication, the method apply an ACL for the provided `Common Name` that allows it to write to the topic.


[example msk provisioned grantProduce](./examples/msk-provisioned-grant-produce.lit.ts)

### grantConsume
This method allows to grant a `Principal` the rights to read to a kafka topic.
In case of IAM authentication the method attachs an IAM policy as defined in the [AWS documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#iam-access-control-use-cases) scoped only to the topic provided. For mTLS authentication, the method apply an ACL for the provided `Common Name` that allows it to read from the topic.

[example msk provisioned grantConsume](./examples/msk-provisioned-grant-consume.lit.ts)

[//]: # (streaming.msk-serverless)
# MSK Serverless

An MSK Serverless cluster with helpers to manage topics and IAM permissions.

## Overview

The construct creates an MSK Serverless Cluster, with the latest Kafka version in MSK as default. You can change the dafaults by passing your own parameters as a Resource property to construct initializer. There is also a method to create topics. Last, it also provides methods to grant an existing principal (ie IAM Role or IAM User) with the permission to `produce` or `consume` from a kafka topic. The diagram below shows the high level architecture.

![MSK Serverless High level architecture](../../../website/static/img/msk-serverless.png)


The construct can create a VPC on your behalf that is used to deploy MSK Serverless cluser or you can provide your own VPC definition through the `vpcConfigs` property when you initialize the construct. The VPC that is created on your behalf has `10.0.0.0/16` CIDR range, and comes with an S3 VPC Endpoint Gateway attached to it. The construct also creates a security group for that is attached to the brokers.

The construct has the following interfaces, you will usage examples in the new sections:
* setTopic: Perform create, update, and delete operations on Topics
* grantProduce: Attach an IAM policy to a principal to write to a topic
* grantConsume: Attach an IAM policy to a principal to read from a topic

Below you can find an example of creating an MSK Serverless configuration with the default options.

[example msk serverless default](./examples/msk-serverless-default.lit.ts)


## Usage

### Bring Your Own VPC

The construct allows you to provide your own VPC that was created outside the CDK Stack. Below you will find an example usage.


[example msk serverless bring your own vpc](./examples/msk-serverless-bring-vpc.lit.ts)

### setTopic

This method allows you to create, update or delete a topic. Its backend uses [kafkajs](https://kafka.js.org/).
The topic is defined by the property type called `MskTopic`. Below you can see the definition of the topic as well as an example of use.

```json
{
topic: <String>,
numPartitions: <Number>, // default: -1 (uses broker `num.partitions` configuration)
replicationFactor: <Number>, // default: -1 (uses broker `default.replication.factor` configuration)
configEntries: <Array> // Example: [{ name: 'cleanup.policy', value: 'compact' }] - default: []
}
```

[example msk serverless default](./examples/msk-serverless-set-topic.lit.ts)

### grantProduce

This method allows to grant a `Principal` the rights to write to a kafka topic.
The method attachs an IAM policy as defined in the [AWS documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#iam-access-control-use-cases) scoped only to the topic provided.


[example msk serverless grantProduce](./examples/msk-serverless-grant-produce.lit.ts)

### grantConsume
This method allows to grant a `Principal` the rights to read to a kafka topic.
The method attachs an IAM policy as defined in the [AWS documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#iam-access-control-use-cases) scoped only to the topic provided.

[example msk serverless grantProduce](./examples/msk-serverless-grant-consume.lit.ts)

[//]: # (streaming.kafka-api)
# Kafka Api - Bring your own cluster

Expand Down
30 changes: 30 additions & 0 deletions framework/src/streaming/examples/msk-provisioned-bring-vpc.lit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import * as cdk from 'aws-cdk-lib';
import { MskProvisioned } from '../lib/msk';
import { Vpc } from 'aws-cdk-lib/aws-ec2';


const app = new cdk.App();

const stack = new cdk.Stack(app, 'DsfTestMskServerless');

stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true);

/// !show
let vpc = Vpc.fromVpcAttributes(stack, 'vpc', {
vpcId: 'vpc-1111111111',
vpcCidrBlock: '10.0.0.0/16',
availabilityZones: ['eu-west-1a', 'eu-west-1b'],
publicSubnetIds: ['subnet-111111111', 'subnet-11111111'],
privateSubnetIds: ['subnet-11111111', 'subnet-1111111'],
});

const msk = new MskProvisioned(stack, 'cluster', {
vpc: vpc,
clusterName: 'my-cluster',
subnets: vpc.selectSubnets(),
});
/// !hide

new cdk.CfnOutput(stack, 'mskArn', {
value: msk.cluster.attrArn,
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import * as cdk from 'aws-cdk-lib';
import { Authentication, ClientAuthentication, MskProvisioned } from '../lib/msk';
import { CertificateAuthority } from 'aws-cdk-lib/aws-acmpca';
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';


const app = new cdk.App();

const stack = new cdk.Stack(app, 'MskProvisionedDsf');

stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true);

/// !show
let certificateAuthority = CertificateAuthority.fromCertificateAuthorityArn(
stack, 'certificateAuthority',
'arn:aws:acm-pca:eu-west-1:123456789012:certificate-authority/aaaaaaaa-bbbb-454a-cccc-b454877f0d1b');

const msk = new MskProvisioned(stack, 'cluster', {
clientAuthentication: ClientAuthentication.saslTls(
{
iam: true,
certificateAuthorities: [certificateAuthority],
},
),
certificateDefinition: {
adminPrincipal: 'User:CN=Admin',
aclAdminPrincipal: 'User:CN=aclAdmin',
secretCertificate: Secret.fromSecretCompleteArn(stack, 'secret', 'arn:aws:secretsmanager:eu-west-1:123456789012:secret:dsf/mskCert-3UhUJJ'),
},
allowEveryoneIfNoAclFound: false,
});
/// !hide

msk.grantConsume('consume', 'foo', Authentication.MTLS, 'User:Cn=MyUser');


26 changes: 26 additions & 0 deletions framework/src/streaming/examples/msk-provisioned-default.lit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import * as cdk from 'aws-cdk-lib';
import { AclOperationTypes, AclPermissionTypes, AclResourceTypes,MskProvisioned, ResourcePatternTypes } from '../lib/msk';


const app = new cdk.App();

const stack = new cdk.Stack(app, 'MskProvisionedDsf');

stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true);

/// !show
const msk = new MskProvisioned(stack, 'cluster');
/// !hide

msk.setAcl('acl', {
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-1',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:Cn=Toto',
host: '*',
operation: AclOperationTypes.CREATE,
permissionType: AclPermissionTypes.ALLOW,
},
cdk.RemovalPolicy.DESTROY);


Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import * as cdk from 'aws-cdk-lib';
import { Authentication, ClientAuthentication, MskProvisioned } from '../lib/msk';
import { CertificateAuthority } from 'aws-cdk-lib/aws-acmpca';
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';


const app = new cdk.App();

const stack = new cdk.Stack(app, 'MskProvisionedDsf');

stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true);


let certificateAuthority = CertificateAuthority.fromCertificateAuthorityArn(
stack, 'certificateAuthority',
'arn:aws:acm-pca:eu-west-1:123456789012:certificate-authority/aaaaaaaa-bbbb-454a-cccc-b454877f0d1b');

const msk = new MskProvisioned(stack, 'cluster', {
clientAuthentication: ClientAuthentication.saslTls(
{
iam: true,
certificateAuthorities: [certificateAuthority],
},
),
certificateDefinition: {
adminPrincipal: 'User:CN=Admin',
aclAdminPrincipal: 'User:CN=aclAdmin',
secretCertificate: Secret.fromSecretCompleteArn(stack, 'secret', 'arn:aws:secretsmanager:eu-west-1:123456789012:secret:dsf/mskCert-3UhUJJ'),
},
allowEveryoneIfNoAclFound: false,
});

/// !show
msk.grantConsume('consume', 'foo', Authentication.MTLS, 'User:Cn=MyUser');
/// !hide



Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import * as cdk from 'aws-cdk-lib';
import { Authentication, ClientAuthentication, MskProvisioned } from '../lib/msk';
import { CertificateAuthority } from 'aws-cdk-lib/aws-acmpca';
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';


const app = new cdk.App();

const stack = new cdk.Stack(app, 'MskProvisionedDsf');

stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true);


let certificateAuthority = CertificateAuthority.fromCertificateAuthorityArn(
stack, 'certificateAuthority',
'arn:aws:acm-pca:eu-west-1:123456789012:certificate-authority/aaaaaaaa-bbbb-454a-cccc-b454877f0d1b');

const msk = new MskProvisioned(stack, 'cluster', {
clientAuthentication: ClientAuthentication.saslTls(
{
iam: true,
certificateAuthorities: [certificateAuthority],
},
),
certificateDefinition: {
adminPrincipal: 'User:CN=Admin',
aclAdminPrincipal: 'User:CN=aclAdmin',
secretCertificate: Secret.fromSecretCompleteArn(stack, 'secret', 'arn:aws:secretsmanager:eu-west-1:123456789012:secret:dsf/mskCert-3UhUJJ'),
},
allowEveryoneIfNoAclFound: false,
});

/// !show
msk.grantProduce('consume', 'foo', Authentication.MTLS, 'User:Cn=MyUser');
/// !hide



25 changes: 25 additions & 0 deletions framework/src/streaming/examples/msk-provisioned-set-acl.lit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import * as cdk from 'aws-cdk-lib';
import { AclOperationTypes, AclPermissionTypes, AclResourceTypes, MskProvisioned, ResourcePatternTypes } from '../lib/msk';


const app = new cdk.App();

const stack = new cdk.Stack(app, 'MskProvisionedDsf');

stack.node.setContext('@data-solutions-framework-on-aws/removeDataOnDestroy', true);


const msk = new MskProvisioned(stack, 'cluster');

/// !show
msk.setAcl('acl', {
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-1',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:Cn=Bar',
host: '*',
operation: AclOperationTypes.CREATE,
permissionType: AclPermissionTypes.ALLOW,
},
cdk.RemovalPolicy.DESTROY);
/// !hide
Loading

0 comments on commit 9ef8480

Please sign in to comment.