Skip to content

Commit

Permalink
feat: Add support for MSK Source Configuration (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
fdmsantos committed Feb 22, 2024
1 parent 0fa8a09 commit 9262a54
Show file tree
Hide file tree
Showing 13 changed files with 393 additions and 57 deletions.
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ All notable changes to this project will be documented in this file.

### ⚠ BREAKING CHANGES

* Add support for Opensearch and Opensearch Serverless destinations

### Features

* Add support for Opensearch and Opensearch Serverless destinations ([9c37b8f](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/commit/9c37b8f6b2c512ee1a493a044f9b2c852f6eeec8))

### [2.2.3](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/compare/v2.2.2...v2.2.3) (2024-02-16)
Expand Down
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ generate-toc:
gh-md-toc README.md

pre-commit:
/usr/local/bin/pre-commit run -a
pre-commit run -a
25 changes: 0 additions & 25 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -174,28 +174,3 @@
of your accepting any such warranty or additional liability.

END OF TERMS AND CONDITIONS

APPENDIX: How to apply the Apache License to your work.

To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
67 changes: 53 additions & 14 deletions README.md

Large diffs are not rendered by default.

60 changes: 60 additions & 0 deletions examples/s3/msk-to-s3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Kinesis Firehose: Kinesis Data Source To S3

Basic Configuration in this directory creates kinesis firehose stream with MSK Cluster as source and S3 bucket as destination with a basic configuration.

## Usage

To run this example you need to execute:

```bash
$ terraform init
$ terraform plan
$ terraform apply
```

Note that this example may create resources which cost money. Run `terraform destroy` when you don't need these resources.

<!-- BEGINNING OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
## Requirements

| Name | Version |
|------|---------|
| <a name="requirement_terraform"></a> [terraform](#requirement\_terraform) | >= 0.13.1 |
| <a name="requirement_aws"></a> [aws](#requirement\_aws) | ~> 5.0 |
| <a name="requirement_random"></a> [random](#requirement\_random) | ~> 3.0 |

## Providers

| Name | Version |
|------|---------|
| <a name="provider_aws"></a> [aws](#provider\_aws) | ~> 5.0 |
| <a name="provider_random"></a> [random](#provider\_random) | ~> 3.0 |

## Modules

| Name | Source | Version |
|------|--------|---------|
| <a name="module_firehose"></a> [firehose](#module\_firehose) | ../../../ | n/a |
| <a name="module_msk_cluster"></a> [msk\_cluster](#module\_msk\_cluster) | terraform-aws-modules/msk-kafka-cluster/aws | 2.3.0 |
| <a name="module_security_group"></a> [security\_group](#module\_security\_group) | terraform-aws-modules/security-group/aws | ~> 5.0 |
| <a name="module_vpc"></a> [vpc](#module\_vpc) | terraform-aws-modules/vpc/aws | ~> 5.0 |

## Resources

| Name | Type |
|------|------|
| [aws_msk_cluster_policy.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/msk_cluster_policy) | resource |
| [aws_s3_bucket.s3](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket) | resource |
| [random_pet.this](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/pet) | resource |
| [aws_availability_zones.available](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/availability_zones) | data source |

## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_name_prefix"></a> [name\_prefix](#input\_name\_prefix) | Name prefix to use in resources | `string` | `"msk-to-s3-basic"` | no |

## Outputs

No outputs.
<!-- END OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
113 changes: 113 additions & 0 deletions examples/s3/msk-to-s3/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
data "aws_availability_zones" "available" {}

locals {
vpc_cidr = "10.0.0.0/16"
azs = slice(data.aws_availability_zones.available.names, 0, 2)
}

resource "random_pet" "this" {
length = 2
}

resource "aws_s3_bucket" "s3" {
bucket = "${var.name_prefix}-destination-bucket-${random_pet.this.id}"
force_destroy = true
}

module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "~> 5.0"
name = "${var.name_prefix}-vpc"
cidr = local.vpc_cidr
azs = local.azs
public_subnets = [for k, v in local.azs : cidrsubnet(local.vpc_cidr, 8, k)]
private_subnets = [for k, v in local.azs : cidrsubnet(local.vpc_cidr, 8, k + 3)]
database_subnets = [for k, v in local.azs : cidrsubnet(local.vpc_cidr, 8, k + 6)]
create_database_subnet_group = true
enable_nat_gateway = true
single_nat_gateway = true
}

module "security_group" {
source = "terraform-aws-modules/security-group/aws"
version = "~> 5.0"

name = "${var.name_prefix}-sg"
description = "Security group for ${var.name_prefix}-sg"
vpc_id = module.vpc.vpc_id

ingress_cidr_blocks = module.vpc.private_subnets_cidr_blocks
ingress_rules = [
"kafka-broker-tcp",
"kafka-broker-tls-tcp"
]
}

module "msk_cluster" {
source = "terraform-aws-modules/msk-kafka-cluster/aws"
version = "2.3.0"
name = "${var.name_prefix}-msk"
kafka_version = "3.4.0"
number_of_broker_nodes = 2
broker_node_client_subnets = module.vpc.public_subnets
broker_node_instance_type = "kafka.m5.large"
broker_node_security_groups = [module.security_group.security_group_id]
broker_node_connectivity_info = {
public_access = {
# type = "SERVICE_PROVIDED_EIPS"
type = "DISABLED"
}
vpc_connectivity = {
client_authentication = {
tls = false
sasl = {
iam = true
scram = false
}
}
}
}
client_authentication = {
sasl = { iam = true }
}
enable_storage_autoscaling = false
create_cloudwatch_log_group = false
cloudwatch_logs_enabled = false
s3_logs_enabled = false
configuration_name = "${var.name_prefix}-msk-configuration"
configuration_description = "${var.name_prefix} MSK configuration"
configuration_server_properties = {
"allow.everyone.if.no.acl.found" = false
}
}

resource "aws_msk_cluster_policy" "this" {
cluster_arn = module.msk_cluster.arn
policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Sid = "FirehoseMskClusterPolicy"
Effect = "Allow"
Principal = {
"Service" = "firehose.amazonaws.com"
}
Action = [
"kafka:Describe*",
"kafka:Get*",
"kafka:CreateVpcConnection",
"kafka:GetBootstrapBrokers",
]
Resource = module.msk_cluster.arn
}]
})
}

module "firehose" {
source = "../../../"
name = "${var.name_prefix}-delivery-stream"
input_source = "msk"
msk_source_cluster_arn = module.msk_cluster.arn
msk_source_topic_name = "test"
destination = "s3"
s3_bucket_arn = aws_s3_bucket.s3.arn
}
24 changes: 24 additions & 0 deletions examples/s3/msk-to-s3/outputs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#output "kinesis_firehose_arn" {
# description = "The ARN of the Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_arn
#}
#
#output "kinesis_data_stream_name" {
# description = "The name of the Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_name
#}
#
#output "kinesis_firehose_destination_id" {
# description = "The Destination id of the Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_destination_id
#}
#
#output "kinesis_firehose_version_id" {
# description = "The Version id of the Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_version_id
#}
#
#output "kinesis_firehose_role_arn" {
# description = "The ARN of the IAM role created for Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_role_arn
#}
5 changes: 5 additions & 0 deletions examples/s3/msk-to-s3/variables.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
variable "name_prefix" {
description = "Name prefix to use in resources"
type = string
default = "msk-to-s3-basic"
}
14 changes: 14 additions & 0 deletions examples/s3/msk-to-s3/versions.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
terraform {
required_version = ">= 0.13.1"

required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
random = {
source = "hashicorp/random"
version = "~> 3.0"
}
}
}
56 changes: 55 additions & 1 deletion iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ locals {
application_role_name = coalesce(var.application_role_name, "${var.name}-application-role", "*")
create_application_role_policy = var.create && var.create_application_role_policy
add_backup_policies = local.enable_s3_backup && var.s3_backup_use_existing_role
add_kinesis_source_policy = var.create && var.create_role && local.is_kinesis_source && var.kinesis_source_use_existing_role
add_kinesis_source_policy = var.create && var.create_role && local.is_kinesis_source && var.kinesis_source_use_existing_role && var.source_use_existing_role
add_msk_source_policy = var.create && var.create_role && local.is_msk_source && var.source_use_existing_role
add_lambda_policy = var.create && var.create_role && var.enable_lambda_transform
add_s3_kms_policy = var.create && var.create_role && ((local.add_backup_policies && var.s3_backup_enable_encryption) || var.enable_s3_encryption)
add_glue_policy = var.create && var.create_role && var.enable_data_format_conversion && var.data_format_conversion_glue_use_existing_role
Expand Down Expand Up @@ -100,6 +101,59 @@ resource "aws_iam_role_policy_attachment" "kinesis" {
policy_arn = aws_iam_policy.kinesis[0].arn
}

##################
# MSK Source
##################
data "aws_iam_policy_document" "msk" {
count = local.add_msk_source_policy ? 1 : 0
statement {
effect = "Allow"
actions = [
"kafka:GetBootstrapBrokers",
"kafka:DescribeCluster",
"kafka:DescribeClusterV2",
"kafka-cluster:Connect"
]
resources = [var.msk_source_cluster_arn]
}

statement {
effect = "Allow"
actions = [
"kafka-cluster:DescribeTopic",
"kafka-cluster:DescribeTopicDynamicConfiguration",
"kafka-cluster:ReadData"
]
resources = [
"${var.msk_source_cluster_arn}/${var.msk_source_topic_name}"
]
}

statement {
effect = "Allow"
actions = [
"kafka-cluster:DescribeGroup"
]
resources = [
"${var.msk_source_cluster_arn}/*"
]
}
}

resource "aws_iam_policy" "msk" {
count = local.is_msk_source ? 1 : 0
name = "${local.role_name}-msk"
path = var.policy_path
policy = data.aws_iam_policy_document.msk[0].json
tags = var.tags
}

resource "aws_iam_role_policy_attachment" "msk" {
count = local.add_msk_source_policy ? 1 : 0
role = aws_iam_role.firehose[0].name
policy_arn = aws_iam_policy.msk[0].arn
}

##################
# Lambda
##################
Expand Down
13 changes: 9 additions & 4 deletions locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ locals {
cw_log_group_name = "/aws/kinesisfirehose/${var.name}"
cw_log_delivery_stream_name = "DestinationDelivery"
cw_log_backup_stream_name = "BackupDelivery"
source = var.input_source
is_kinesis_source = local.source == "kinesis" ? true : false
is_waf_source = local.source == "waf" ? true : false
is_kinesis_source = var.input_source == "kinesis" ? true : false
is_waf_source = var.input_source == "waf" ? true : false
is_msk_source = var.input_source == "msk" ? true : false
destinations = {
s3 : "extended_s3",
extended_s3 : "extended_s3",
Expand Down Expand Up @@ -150,8 +150,13 @@ locals {
}
s3_backup_mode = local.use_backup_vars_in_s3_configuration ? local.backup_modes[local.destination][var.s3_backup_mode] : null

# Common Source Variables
source_role = (local.is_kinesis_source || local.is_msk_source ? (
var.source_use_existing_role ? local.firehose_role_arn : var.source_role_arn
) : null)

# Kinesis source Stream
kinesis_source_stream_role = (local.is_kinesis_source ? (
kinesis_source_stream_role = (local.is_kinesis_source ? ( # TODO: Deprecated. Remove Next Major Version
var.kinesis_source_use_existing_role ? local.firehose_role_arn : var.kinesis_source_role_arn
) : null)

Expand Down
14 changes: 13 additions & 1 deletion main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ resource "aws_kinesis_firehose_delivery_stream" "this" {
for_each = local.is_kinesis_source ? [1] : []
content {
kinesis_stream_arn = var.kinesis_source_stream_arn
role_arn = local.kinesis_source_stream_role
role_arn = var.kinesis_source_use_existing_role ? local.source_role : local.kinesis_source_stream_role # TODO: Next Major version, role should be equals to local.source_role
}
}

dynamic "msk_source_configuration" {
for_each = local.is_msk_source ? [1] : []
content {
authentication_configuration {
connectivity = var.msk_source_connectivity_type
role_arn = local.source_role
}
msk_cluster_arn = var.msk_source_cluster_arn
topic_name = var.msk_source_topic_name
}
}

Expand Down
Loading

0 comments on commit 9262a54

Please sign in to comment.