From f50342cf660fc49f5c00fe87b876913fc1b55e2f Mon Sep 17 00:00:00 2001 From: Fabio Santos Date: Tue, 13 Sep 2022 18:05:31 +0100 Subject: [PATCH] feat: Add support to redshift destination --- README.md | 61 +- .../redshift/direct-put-to-redshift/README.md | 67 +++ .../redshift/direct-put-to-redshift/main.tf | 49 ++ .../direct-put-to-redshift/outputs.tf | 19 + .../direct-put-to-redshift/redshift_table.sql | 7 + .../direct-put-to-redshift/variables.tf | 19 + .../direct-put-to-redshift/versions.tf | 14 + examples/s3/direct-put-to-s3/main.tf | 3 +- iam.tf | 22 +- main.tf | 86 ++- variables.tf | 562 ++++++++++-------- 11 files changed, 645 insertions(+), 264 deletions(-) create mode 100644 examples/redshift/direct-put-to-redshift/README.md create mode 100644 examples/redshift/direct-put-to-redshift/main.tf create mode 100644 examples/redshift/direct-put-to-redshift/outputs.tf create mode 100644 examples/redshift/direct-put-to-redshift/redshift_table.sql create mode 100644 examples/redshift/direct-put-to-redshift/variables.tf create mode 100644 examples/redshift/direct-put-to-redshift/versions.tf diff --git a/README.md b/README.md index 9eaa098..28a6853 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,18 @@ # AWS Kinesis Firehose Terraform module -Terraform module, which creates a Kinesis Firehose Stream and others resources like Cloudwatch and IAM Role that integrate with Kinesis Firehose. +Terraform module, which creates a Kinesis Firehose Stream and others resources like Cloudwatch and IAM Role that integrate with Kinesis Firehose. ## Features -- Kinesis Data Stream or Direct Put as source. -- S3 Destination. +- Sources + - Kinesis Data Stream + - Direct Put +- Destinations + - S3 Destination + - Data Format Conversion + - Dynamic Partition + - Redshift - Data Transformation With Lambda -- Data Format Conversion -- Dynamic Partition - S3 Backup - Logging and Encryption @@ -45,7 +49,27 @@ module "firehose" { ``` -### Lambda Transformation +### Redshift Destination + +```hcl +module "firehose" { + source = "fdmsantos/kinesis-firehose/aws" + name = "firehose-delivery-stream" + destination = "redshift" + s3_bucket_arn = "" + redshift_cluster_identifier = "" + redshift_cluster_endpoint = "" + redshift_database_name = "" + redshift_username = "" + redshift_password = "" + redshift_table_name = "" + redshift_copy_options = "json 'auto ignorecase'" +} +``` + +### Data Transformation with Lambda + +**Note:** All data transformation with lambda variables starts with transform `transform_lambda`. ```hcl module "firehose" { @@ -66,6 +90,8 @@ module "firehose" { ### Data Format Conversion +**Note:** All data format conversion variables starts with transform `data_format_conversion`. + ```hcl module "firehose" { source = "fdmsantos/kinesis-firehose/aws" @@ -85,6 +111,8 @@ module "firehose" { ### Dynamic Partition +**Note:** All dynamic partition variables starts with transform `dynamic_partitioning`. + ```hcl module "firehose" { source = "fdmsantos/kinesis-firehose/aws" @@ -108,7 +136,7 @@ module "firehose" { - [Direct Put](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/examples/s3/direct-put-to-s3) - Creates an encrypted Kinesis firehose stream with Direct Put as source and S3 as destination. - [Kinesis Data Stream Source](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/examples/s3/kinesis-to-s3-basic) - Creates a basic Kinesis Firehose stream with Kinesis data stream as source and s3 as destination . - [S3 Destination Complete](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/examples/s3/kinesis-to-s3-complete) - Creates a Kinesis Firehose Stream with all features enabled. - +- [Redshift](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/examples/redshift/direct-put-to-redshift) - Creates a Kinesis Firehose Stream with redshift as destination. ## Requirements @@ -149,6 +177,7 @@ No modules. | [aws_iam_role_policy_attachment.s3](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource | | [aws_iam_role_policy_attachment.s3_kms](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource | | [aws_kinesis_firehose_delivery_stream.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/kinesis_firehose_delivery_stream) | resource | +| [aws_redshift_cluster_iam_roles.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/redshift_cluster_iam_roles) | resource | | [aws_caller_identity.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source | | [aws_iam_policy_document.assume_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | | [aws_iam_policy_document.cw](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | @@ -163,6 +192,7 @@ No modules. | Name | Description | Type | Default | Required | |------|-------------|------|---------|:--------:| +| [associate\_role\_to\_redshift\_cluster](#input\_associate\_role\_to\_redshift\_cluster) | Set it to false if don't want the module associate the role to redshift cluster | `bool` | `true` | no | | [buffer\_interval](#input\_buffer\_interval) | Buffer incoming data for the specified period of time, in seconds, before delivering it to the destination | `number` | `300` | no | | [buffer\_size](#input\_buffer\_size) | Buffer incoming data to the specified size, in MBs, before delivering it to the destination. | `number` | `5` | no | | [create\_destination\_cw\_log\_group](#input\_create\_destination\_cw\_log\_group) | Enables or disables the cloudwatch log group creation to destination | `bool` | `true` | no | @@ -203,7 +233,7 @@ No modules. | [dynamic\_partition\_append\_delimiter\_to\_record](#input\_dynamic\_partition\_append\_delimiter\_to\_record) | To configure your delivery stream to add a new line delimiter between records in objects that are delivered to Amazon S3. | `bool` | `false` | no | | [dynamic\_partition\_enable\_record\_deaggregation](#input\_dynamic\_partition\_enable\_record\_deaggregation) | Data deaggregation is the process of parsing through the records in a delivery stream and separating the records based either on valid JSON or on the specified delimiter | `bool` | `false` | no | | [dynamic\_partition\_metadata\_extractor\_query](#input\_dynamic\_partition\_metadata\_extractor\_query) | Dynamic Partition JQ query. | `string` | `null` | no | -| [dynamic\_partition\_record\_deaggregation\_delimiter](#input\_dynamic\_partition\_record\_deaggregation\_delimiter) | Specifies the delimiter to be used for parsing through the records in the delivery stream and deaggregating them. | `string` | `null` | no | +| [dynamic\_partition\_record\_deaggregation\_delimiter](#input\_dynamic\_partition\_record\_deaggregation\_delimiter) | Specifies the delimiter to be used for parsing through the records in the delivery stream and deaggregating them | `string` | `null` | no | | [dynamic\_partition\_record\_deaggregation\_type](#input\_dynamic\_partition\_record\_deaggregation\_type) | Data deaggregation is the process of parsing through the records in a delivery stream and separating the records based either on valid JSON or on the specified delimiter | `string` | `"JSON"` | no | | [dynamic\_partitioning\_retry\_duration](#input\_dynamic\_partitioning\_retry\_duration) | Total amount of seconds Firehose spends on retries | `number` | `300` | no | | [enable\_data\_format\_conversion](#input\_enable\_data\_format\_conversion) | Set it to true if you want to disable format conversion. | `bool` | `false` | no | @@ -222,6 +252,15 @@ No modules. | [kinesis\_source\_use\_existing\_role](#input\_kinesis\_source\_use\_existing\_role) | Indicates if want use the kinesis firehose role to kinesis data stream access. | `bool` | `true` | no | | [name](#input\_name) | A name to identify the stream. This is unique to the AWS account and region the Stream is created in | `string` | n/a | yes | | [policy\_path](#input\_policy\_path) | Path of policies to that should be added to IAM role for Kinesis Firehose Stream | `string` | `null` | no | +| [redshift\_cluster\_endpoint](#input\_redshift\_cluster\_endpoint) | The redshift endpoint | `string` | `null` | no | +| [redshift\_cluster\_identifier](#input\_redshift\_cluster\_identifier) | Redshift Cluster identifier. Necessary to associate the iam role to cluster | `string` | `null` | no | +| [redshift\_copy\_options](#input\_redshift\_copy\_options) | Copy options for copying the data from the s3 intermediate bucket into redshift, for example to change the default delimiter | `string` | `null` | no | +| [redshift\_data\_table\_columns](#input\_redshift\_data\_table\_columns) | The data table columns that will be targeted by the copy command | `string` | `null` | no | +| [redshift\_database\_name](#input\_redshift\_database\_name) | The redshift database name | `string` | `null` | no | +| [redshift\_password](#input\_redshift\_password) | The password for the redshift username above | `string` | `null` | no | +| [redshift\_retry\_duration](#input\_redshift\_retry\_duration) | The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt | `string` | `3600` | no | +| [redshift\_table\_name](#input\_redshift\_table\_name) | The name of the table in the redshift cluster that the s3 bucket will copy to | `string` | `null` | no | +| [redshift\_username](#input\_redshift\_username) | The username that the firehose delivery stream will assume. It is strongly recommended that the username and password provided is used exclusively for Amazon Kinesis Firehose purposes, and that the permissions for the account are restricted for Amazon Redshift INSERT permissions | `string` | `null` | no | | [role\_description](#input\_role\_description) | Description of IAM role to use for Kinesis Firehose Stream | `string` | `null` | no | | [role\_force\_detach\_policies](#input\_role\_force\_detach\_policies) | Specifies to force detaching any policies the IAM role has before destroying it | `bool` | `true` | no | | [role\_name](#input\_role\_name) | Name of IAM role to use for Kinesis Firehose Stream | `string` | `null` | no | @@ -272,6 +311,12 @@ No modules. | [kinesis\_firehose\_version\_id](#output\_kinesis\_firehose\_version\_id) | The Version id of the Kinesis Firehose Stream | +## Work in Progress + +- ElasticSearch / OpenSearch Destination +- Http Endpoint Destination +- Other supported destinations + ## License Apache 2 Licensed. See [LICENSE](https://github.com/fdmsantos/terraform-aws-kinesis-firehose/tree/main/LICENSE) for full details. diff --git a/examples/redshift/direct-put-to-redshift/README.md b/examples/redshift/direct-put-to-redshift/README.md new file mode 100644 index 0000000..30981c4 --- /dev/null +++ b/examples/redshift/direct-put-to-redshift/README.md @@ -0,0 +1,67 @@ +# Kinesis Firehose: Direct Put To Redshift + +Configuration in this directory creates kinesis firehose stream with Direct Put as source and Redshift as destination. + +This example is ready to be tested with Demo Data in Kinesis Firehose Console. + +## Usage + +To run this example you need to execute: + +```bash +$ terraform init +$ terraform plan +$ terraform apply +``` + +Note that this example may create resources which cost money. Run `terraform destroy` when you don't need these resources. + +## Requirements + +| Name | Version | +|------|---------| +| [terraform](#requirement\_terraform) | >= 0.13.1 | +| [aws](#requirement\_aws) | >= 4.4 | +| [random](#requirement\_random) | >= 2.0 | + +## Providers + +| Name | Version | +|------|---------| +| [aws](#provider\_aws) | >= 4.4 | +| [random](#provider\_random) | >= 2.0 | + +## Modules + +| Name | Source | Version | +|------|--------|---------| +| [firehose](#module\_firehose) | ../../../ | n/a | + +## Resources + +| Name | Type | +|------|------| +| [aws_iam_policy.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource | +| [aws_iam_role.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource | +| [aws_iam_role_policy_attachment.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy_attachment) | resource | +| [aws_kms_key.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/kms_key) | resource | +| [aws_s3_bucket.s3](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket) | resource | +| [random_pet.this](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/pet) | resource | +| [aws_iam_policy_document.assume_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | +| [aws_iam_policy_document.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | + +## Inputs + +| Name | Description | Type | Default | Required | +|------|-------------|------|---------|:--------:| +| [name\_prefix](#input\_name\_prefix) | Name prefix to use in resources | `string` | `"direct-put-to-s3"` | no | + +## Outputs + +| Name | Description | +|------|-------------| +| [kinesis\_firehose\_arn](#output\_kinesis\_firehose\_arn) | The ARN of the Kinesis Firehose Stream | +| [kinesis\_firehose\_destination\_id](#output\_kinesis\_firehose\_destination\_id) | The Destination id of the Kinesis Firehose Stream | +| [kinesis\_firehose\_role\_arn](#output\_kinesis\_firehose\_role\_arn) | The ARN of the IAM role created for Kinesis Firehose Stream | +| [kinesis\_firehose\_version\_id](#output\_kinesis\_firehose\_version\_id) | The Version id of the Kinesis Firehose Stream | + diff --git a/examples/redshift/direct-put-to-redshift/main.tf b/examples/redshift/direct-put-to-redshift/main.tf new file mode 100644 index 0000000..a30325f --- /dev/null +++ b/examples/redshift/direct-put-to-redshift/main.tf @@ -0,0 +1,49 @@ +resource "random_pet" "this" { + length = 2 +} + +resource "aws_s3_bucket" "s3" { + bucket = "${var.name_prefix}-destination-bucket-${random_pet.this.id}" + force_destroy = true +} + +resource "aws_redshift_cluster" "this" { + cluster_identifier = "${var.name_prefix}-redshift-cluster" + database_name = "test" + master_username = var.redshift_username + master_password = var.redshift_password + node_type = "dc2.large" + cluster_type = "single-node" + skip_final_snapshot = true + provisioner "local-exec" { + command = "psql \"postgresql://${self.master_username}:${self.master_password}@${self.endpoint}/${self.database_name}\" -f ./redshift_table.sql" + } +} + +resource "aws_kms_key" "this" { + description = "${var.name_prefix}-kms-key" + deletion_window_in_days = 7 +} + +module "firehose" { + source = "../../../" + name = "${var.name_prefix}-delivery-stream" + destination = "redshift" + s3_bucket_arn = aws_s3_bucket.s3.arn + buffer_interval = 60 + redshift_cluster_identifier = aws_redshift_cluster.this.cluster_identifier + redshift_cluster_endpoint = aws_redshift_cluster.this.endpoint + redshift_database_name = aws_redshift_cluster.this.database_name + redshift_username = aws_redshift_cluster.this.master_username + redshift_password = aws_redshift_cluster.this.master_password + redshift_table_name = "firehose_test_table" + redshift_copy_options = "json 'auto ignorecase'" + enable_s3_backup = true + s3_backup_prefix = "backup/" + s3_backup_bucket_arn = aws_s3_bucket.s3.arn + s3_backup_buffer_interval = 100 + s3_backup_buffer_size = 100 + s3_backup_compression = "GZIP" + s3_backup_enable_encryption = true + s3_backup_kms_key_arn = aws_kms_key.this.arn +} diff --git a/examples/redshift/direct-put-to-redshift/outputs.tf b/examples/redshift/direct-put-to-redshift/outputs.tf new file mode 100644 index 0000000..77e5adc --- /dev/null +++ b/examples/redshift/direct-put-to-redshift/outputs.tf @@ -0,0 +1,19 @@ +output "kinesis_firehose_arn" { + description = "The ARN of the Kinesis Firehose Stream" + value = module.firehose.kinesis_firehose_arn +} + +output "kinesis_firehose_destination_id" { + description = "The Destination id of the Kinesis Firehose Stream" + value = module.firehose.kinesis_firehose_destination_id +} + +output "kinesis_firehose_version_id" { + description = "The Version id of the Kinesis Firehose Stream" + value = module.firehose.kinesis_firehose_version_id +} + +output "kinesis_firehose_role_arn" { + description = "The ARN of the IAM role created for Kinesis Firehose Stream" + value = module.firehose.kinesis_firehose_role_arn +} diff --git a/examples/redshift/direct-put-to-redshift/redshift_table.sql b/examples/redshift/direct-put-to-redshift/redshift_table.sql new file mode 100644 index 0000000..617f845 --- /dev/null +++ b/examples/redshift/direct-put-to-redshift/redshift_table.sql @@ -0,0 +1,7 @@ +Create table firehose_test_table +( + ticker_symbol varchar(4), + sector varchar(16), + change float, + price float +); diff --git a/examples/redshift/direct-put-to-redshift/variables.tf b/examples/redshift/direct-put-to-redshift/variables.tf new file mode 100644 index 0000000..cbe8b1d --- /dev/null +++ b/examples/redshift/direct-put-to-redshift/variables.tf @@ -0,0 +1,19 @@ +variable "name_prefix" { + description = "Name prefix to use in resources" + type = string + default = "direct-put-to-redshift" +} + +variable "redshift_username" { + description = "The username that the firehose delivery stream will assume. It is strongly recommended that the username and password provided is used exclusively for Amazon Kinesis Firehose purposes, and that the permissions for the account are restricted for Amazon Redshift INSERT permissions" + type = string + default = null + sensitive = true +} + +variable "redshift_password" { + description = "The password for the redshift username above" + type = string + default = null + sensitive = true +} diff --git a/examples/redshift/direct-put-to-redshift/versions.tf b/examples/redshift/direct-put-to-redshift/versions.tf new file mode 100644 index 0000000..5c3fca6 --- /dev/null +++ b/examples/redshift/direct-put-to-redshift/versions.tf @@ -0,0 +1,14 @@ +terraform { + required_version = ">= 0.13.1" + + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 4.4" + } + random = { + source = "hashicorp/random" + version = ">= 2.0" + } + } +} diff --git a/examples/s3/direct-put-to-s3/main.tf b/examples/s3/direct-put-to-s3/main.tf index 1f79d03..e8d112a 100644 --- a/examples/s3/direct-put-to-s3/main.tf +++ b/examples/s3/direct-put-to-s3/main.tf @@ -57,7 +57,6 @@ module "firehose" { enable_sse = true sse_kms_key_type = "CUSTOMER_MANAGED_CMK" sse_kms_key_arn = aws_kms_key.this.arn - enable_destination_log = true enable_s3_backup = true s3_backup_bucket_arn = aws_s3_bucket.s3.arn s3_backup_prefix = "backup/" @@ -65,7 +64,7 @@ module "firehose" { s3_backup_buffer_interval = 100 s3_backup_buffer_size = 100 s3_backup_compression = "GZIP" - s3_backup_use_existing_role = true + s3_backup_use_existing_role = false s3_backup_role_arn = aws_iam_role.this.arn s3_backup_enable_encryption = true s3_backup_kms_key_arn = aws_kms_key.this.arn diff --git a/iam.tf b/iam.tf index 26d62ab..5df6016 100644 --- a/iam.tf +++ b/iam.tf @@ -4,10 +4,10 @@ locals { add_kinesis_source_policy = var.create_role && var.enable_kinesis_source && var.kinesis_source_use_existing_role add_lambda_policy = var.create_role && var.enable_lambda_transform add_s3_kms_policy = var.create_role && ((local.add_backup_policies && var.s3_backup_enable_encryption) || var.enable_s3_encryption) + add_glue_policy = var.create_role && var.enable_data_format_conversion && var.data_format_conversion_glue_use_existing_role + add_s3_policy = var.create_role + add_cw_policy = var.create_role && ((local.add_backup_policies && var.s3_backup_enable_log) || var.enable_destination_log) # add_sse_kms_policy = var.create_role && var.enable_sse && var.sse_kms_key_type == "CUSTOMER_MANAGED_CMK" - add_glue_policy = var.create_role && var.enable_data_format_conversion && var.data_format_conversion_glue_use_existing_role - add_s3_policy = var.create_role && (local.s3_destination || local.add_backup_policies) - add_cw_policy = var.create_role && ((local.add_backup_policies && var.s3_backup_enable_log) || var.enable_destination_log) } data "aws_iam_policy_document" "assume_role" { @@ -18,8 +18,11 @@ data "aws_iam_policy_document" "assume_role" { actions = ["sts:AssumeRole"] principals { - type = "Service" - identifiers = ["firehose.amazonaws.com"] + type = "Service" + identifiers = compact([ + "firehose.amazonaws.com", + var.destination == "redshift" ? "redshift.amazonaws.com" : "", + ]) } } } @@ -316,3 +319,12 @@ resource "aws_iam_role_policy_attachment" "cw" { role = aws_iam_role.firehose[0].name policy_arn = aws_iam_policy.cw[0].arn } + +################## +# Redshift +################## +resource "aws_redshift_cluster_iam_roles" "this" { + count = var.create_role && var.destination == "redshift" && var.associate_role_to_redshift_cluster ? 1 : 0 + cluster_identifier = var.redshift_cluster_identifier + iam_role_arns = [aws_iam_role.firehose[0].arn] +} diff --git a/main.tf b/main.tf index 0747c3d..6ab714c 100644 --- a/main.tf +++ b/main.tf @@ -96,7 +96,7 @@ locals { # S3 Backup s3_backup_mode = var.enable_s3_backup ? "Enabled" : "Disabled" - s3_backup_mode_role_arn = (var.enable_s3_backup ? ( + s3_backup_role_arn = (var.enable_s3_backup ? ( var.s3_backup_use_existing_role ? local.firehose_role_arn : var.s3_backup_role_arn ) : null) s3_backup_cw_log_group_name = var.create_destination_cw_log_group ? local.cw_log_group_name : var.s3_backup_log_group_name @@ -129,7 +129,7 @@ resource "aws_kinesis_firehose_delivery_stream" "this" { } dynamic "server_side_encryption" { - for_each = var.enable_sse ? [1] : [] + for_each = !var.enable_kinesis_source && var.enable_sse ? [1] : [] content { enabled = var.enable_sse key_arn = var.sse_kms_key_arn @@ -246,7 +246,7 @@ resource "aws_kinesis_firehose_delivery_stream" "this" { for_each = var.enable_s3_backup ? [1] : [] content { bucket_arn = var.s3_backup_bucket_arn - role_arn = local.s3_backup_mode_role_arn + role_arn = local.s3_backup_role_arn prefix = var.s3_backup_prefix buffer_size = var.s3_backup_buffer_size buffer_interval = var.s3_backup_buffer_interval @@ -272,6 +272,86 @@ resource "aws_kinesis_firehose_delivery_stream" "this" { } } + dynamic "s3_configuration" { + for_each = !local.s3_destination ? [1] : [] + content { + role_arn = local.firehose_role_arn + bucket_arn = var.s3_bucket_arn + buffer_size = var.buffer_size + buffer_interval = var.buffer_interval + compression_format = var.s3_compression_format + prefix = var.s3_prefix + error_output_prefix = var.s3_error_output_prefix + kms_key_arn = var.enable_s3_encryption ? var.s3_kms_key_arn : null + } + + } + + dynamic "redshift_configuration" { + for_each = var.destination == "redshift" ? [1] : [] + content { + role_arn = local.firehose_role_arn + cluster_jdbcurl = "jdbc:redshift://${var.redshift_cluster_endpoint}/${var.redshift_database_name}" + username = var.redshift_username + password = var.redshift_password + data_table_name = var.redshift_table_name + copy_options = var.redshift_copy_options + data_table_columns = var.redshift_data_table_columns + s3_backup_mode = local.s3_backup_mode + retry_duration = var.redshift_retry_duration + + dynamic "s3_backup_configuration" { + for_each = var.enable_s3_backup ? [1] : [] + content { + bucket_arn = var.s3_backup_bucket_arn + role_arn = local.s3_backup_role_arn + prefix = var.s3_backup_prefix + buffer_size = var.s3_backup_buffer_size + buffer_interval = var.s3_backup_buffer_interval + compression_format = var.s3_backup_compression + error_output_prefix = var.s3_backup_error_output_prefix + kms_key_arn = var.s3_backup_enable_encryption ? var.s3_backup_kms_key_arn : null + cloudwatch_logging_options { + enabled = var.s3_backup_enable_log + log_group_name = local.s3_backup_cw_log_group_name + log_stream_name = local.s3_backup_cw_log_stream_name + } + } + } + + dynamic "cloudwatch_logging_options" { + for_each = var.enable_destination_log ? [1] : [] + content { + enabled = var.enable_destination_log + log_group_name = local.destination_cw_log_group_name + log_stream_name = local.destination_cw_log_stream_name + } + } + + dynamic "processing_configuration" { + for_each = local.enable_processing ? [1] : [] + content { + enabled = local.enable_processing + dynamic "processors" { + for_each = local.processors + content { + type = processors.value["type"] + dynamic "parameters" { + for_each = processors.value["parameters"] + content { + parameter_name = parameters.value["name"] + parameter_value = parameters.value["value"] + } + } + } + } + } + } + + } + + } + tags = var.tags } diff --git a/variables.tf b/variables.tf index 53c732a..8f38412 100644 --- a/variables.tf +++ b/variables.tf @@ -9,7 +9,7 @@ variable "destination" { validation { error_message = "Please use a valid destination!" - condition = contains(["extended_s3"], var.destination) + condition = contains(["extended_s3", "redshift"], var.destination) } } @@ -19,6 +19,11 @@ variable "create_role" { default = true } +variable "tags" { + description = "A map of tags to assign to resources." + type = map(string) + default = {} +} ###### # All Destinations ###### @@ -84,232 +89,6 @@ variable "transform_lambda_number_retries" { } } -variable "enable_data_format_conversion" { - description = "Set it to true if you want to disable format conversion." - type = bool - default = false -} - -variable "data_format_conversion_glue_database" { - description = "Name of the AWS Glue database that contains the schema for the output data." - type = string - default = null -} - -variable "data_format_conversion_glue_use_existing_role" { - description = "Indicates if want use the kinesis firehose role to glue access." - type = bool - default = true -} - -variable "data_format_conversion_glue_role_arn" { - description = "The role that Kinesis Data Firehose can use to access AWS Glue. This role must be in the same account you use for Kinesis Data Firehose. Cross-account roles aren't allowed." - type = string - default = null -} - -variable "data_format_conversion_glue_table_name" { - description = "Specifies the AWS Glue table that contains the column information that constitutes your data schema" - type = string - default = null -} - -variable "data_format_conversion_glue_catalog_id" { - description = "The ID of the AWS Glue Data Catalog. If you don't supply this, the AWS account ID is used by default." - type = string - default = null -} - -variable "data_format_conversion_glue_region" { - description = "If you don't specify an AWS Region, the default is the current region." - type = string - default = null -} - -variable "data_format_conversion_glue_version_id" { - description = "Specifies the table version for the output data schema." - type = string - default = "LATEST" -} - -variable "data_format_conversion_input_format" { - description = "Specifies which deserializer to use. You can choose either the Apache Hive JSON SerDe or the OpenX JSON SerDe" - type = string - default = "OpenX" - validation { - error_message = "Valid values are HIVE and OPENX." - condition = contains(["HIVE", "OpenX"], var.data_format_conversion_input_format) - } -} - -variable "data_format_conversion_openx_case_insensitive" { - description = "When set to true, Kinesis Data Firehose converts JSON keys to lowercase before deserializing them." - type = bool - default = true -} - -variable "data_format_conversion_openx_convert_dots_to_underscores" { - description = "Specifies that the names of the keys include dots and that you want Kinesis Data Firehose to replace them with underscores. This is useful because Apache Hive does not allow dots in column names." - type = bool - default = false -} - -variable "data_format_conversion_openx_column_to_json_key_mappings" { - description = "A map of column names to JSON keys that aren't identical to the column names. This is useful when the JSON contains keys that are Hive keywords." - type = map(string) - default = null -} - -variable "data_format_conversion_hive_timestamps" { - description = "A list of how you want Kinesis Data Firehose to parse the date and time stamps that may be present in your input data JSON. To specify these format strings, follow the pattern syntax of JodaTime's DateTimeFormat format strings." - type = list(string) - default = [] -} - -variable "data_format_conversion_output_format" { - description = "Specifies which serializer to use. You can choose either the ORC SerDe or the Parquet SerDe" - type = string - default = "PARQUET" - validation { - error_message = "Valid values are ORC and PARQUET." - condition = contains(["ORC", "PARQUET"], var.data_format_conversion_output_format) - } -} - -variable "data_format_conversion_block_size" { - description = "The Hadoop Distributed File System (HDFS) block size. This is useful if you intend to copy the data from Amazon S3 to HDFS before querying. The Value is in Bytes." - type = number - default = 268435456 - validation { - error_message = "Minimum Value is 64 MiB." - condition = var.data_format_conversion_block_size >= 67108864 - } -} - -variable "data_format_conversion_parquet_compression" { - description = "The compression code to use over data blocks." - type = string - default = "SNAPPY" - validation { - error_message = "Valid values are UNCOMPRESSED, SNAPPY and GZIP." - condition = contains(["UNCOMPRESSED", "SNAPPY", "GZIP"], var.data_format_conversion_parquet_compression) - } -} - -variable "data_format_conversion_parquet_dict_compression" { - description = "Indicates whether to enable dictionary compression." - type = bool - default = false -} - -variable "data_format_conversion_parquet_max_padding" { - description = "The maximum amount of padding to apply. This is useful if you intend to copy the data from Amazon S3 to HDFS before querying. The value is in bytes" - type = number - default = 0 -} - -variable "data_format_conversion_parquet_page_size" { - description = "Column chunks are divided into pages. A page is conceptually an indivisible unit (in terms of compression and encoding). The value is in bytes" - type = number - default = 1048576 - validation { - error_message = "Minimum Value is 64 KiB." - condition = var.data_format_conversion_parquet_page_size >= 65536 - } -} - -variable "data_format_conversion_parquet_writer_version" { - description = "Indicates the version of row format to output." - type = string - default = "V1" - validation { - error_message = "Valid values are V1 and V2." - condition = contains(["V1", "V2"], var.data_format_conversion_parquet_writer_version) - } -} - -variable "data_format_conversion_orc_compression" { - description = "The compression code to use over data blocks." - type = string - default = "SNAPPY" - validation { - error_message = "Valid values are NONE, ZLIB and SNAPPY." - condition = contains(["NONE", "ZLIB", "SNAPPY"], var.data_format_conversion_orc_compression) - } -} - -variable "data_format_conversion_orc_format_version" { - description = "The version of the file to write." - type = string - default = "V0_12" - validation { - error_message = "Valid values are V0_11 and V0_12." - condition = contains(["V0_11", "V0_12"], var.data_format_conversion_orc_format_version) - } -} - -variable "data_format_conversion_orc_enable_padding" { - description = "Set this to true to indicate that you want stripes to be padded to the HDFS block boundaries. This is useful if you intend to copy the data from Amazon S3 to HDFS before querying." - type = bool - default = false -} - -variable "data_format_conversion_orc_padding_tolerance" { - description = "A float between 0 and 1 that defines the tolerance for block padding as a decimal fraction of stripe size." - type = number - default = 0.05 - validation { - error_message = "Valid values are V0_11 and V0_12." - condition = var.data_format_conversion_orc_padding_tolerance >= 0 && var.data_format_conversion_orc_padding_tolerance <= 1 - } -} - -variable "data_format_conversion_orc_dict_key_threshold" { - description = "A float that represents the fraction of the total number of non-null rows. To turn off dictionary encoding, set this fraction to a number that is less than the number of distinct keys in a dictionary. To always use dictionary encoding, set this threshold to 1." - type = number - default = 0.0 - validation { - error_message = "Valid values are between 0 and 1." - condition = var.data_format_conversion_orc_dict_key_threshold >= 0 && var.data_format_conversion_orc_dict_key_threshold <= 1 - } -} - -variable "data_format_conversion_orc_bloom_filter_columns" { - description = "A list of column names for which you want Kinesis Data Firehose to create bloom filters." - type = list(string) - default = [] -} - -variable "data_format_conversion_orc_bloom_filter_false_positive_probability" { - description = "The Bloom filter false positive probability (FPP). The lower the FPP, the bigger the Bloom filter." - type = number - default = 0.05 - validation { - error_message = "Valid values are between 0 and 1." - condition = var.data_format_conversion_orc_bloom_filter_false_positive_probability >= 0 && var.data_format_conversion_orc_bloom_filter_false_positive_probability <= 1 - } -} - -variable "data_format_conversion_orc_row_index_stripe" { - description = "The number of rows between index entries." - type = number - default = 10000 - validation { - error_message = "Minimum value is 1000." - condition = var.data_format_conversion_orc_row_index_stripe >= 1000 - } -} - -variable "data_format_conversion_orc_stripe_size" { - description = "he number of bytes in each strip." - type = number - default = 67108864 - validation { - error_message = "Minimum Value is 8 MiB." - condition = var.data_format_conversion_orc_stripe_size >= 8388608 - } -} - variable "enable_s3_backup" { description = "The Amazon S3 backup mode" type = bool @@ -454,6 +233,18 @@ variable "s3_bucket_arn" { default = null } +variable "s3_prefix" { + description = "The YYYY/MM/DD/HH time format prefix is automatically used for delivered S3 files. You can specify an extra prefix to be added in front of the time format prefix. Note that if the prefix ends with a slash, it appears as a folder in the S3 bucket" + type = string + default = null +} + +variable "s3_error_output_prefix" { + description = "Prefix added to failed records before writing them to S3. This prefix appears immediately following the bucket name." + type = string + default = null +} + variable "enable_s3_encryption" { description = "Indicates if want use encryption in S3 bucket." type = bool @@ -476,6 +267,9 @@ variable "s3_compression_format" { } } +###### +# Kinesis Source +###### variable "enable_sse" { description = "Whether to enable encryption at rest. Only makes sense when source is Direct Put" type = bool @@ -534,27 +328,9 @@ variable "kinesis_source_kms_arn" { default = null } -variable "tags" { - description = "A map of tags to assign to resources." - type = map(string) - default = {} -} - ###### # S3 Destination Configurations ###### -variable "s3_prefix" { - description = "The YYYY/MM/DD/HH time format prefix is automatically used for delivered S3 files. You can specify an extra prefix to be added in front of the time format prefix. Note that if the prefix ends with a slash, it appears as a folder in the S3 bucket" - type = string - default = null -} - -variable "s3_error_output_prefix" { - description = "Prefix added to failed records before writing them to S3. This prefix appears immediately following the bucket name." - type = string - default = null -} - variable "enable_dynamic_partitioning" { description = "Enables or disables dynamic partitioning" type = bool @@ -600,11 +376,305 @@ variable "dynamic_partition_record_deaggregation_type" { } variable "dynamic_partition_record_deaggregation_delimiter" { - description = "Specifies the delimiter to be used for parsing through the records in the delivery stream and deaggregating them." + description = "Specifies the delimiter to be used for parsing through the records in the delivery stream and deaggregating them" type = string default = null } +variable "enable_data_format_conversion" { + description = "Set it to true if you want to disable format conversion." + type = bool + default = false +} + +variable "data_format_conversion_glue_database" { + description = "Name of the AWS Glue database that contains the schema for the output data." + type = string + default = null +} + +variable "data_format_conversion_glue_use_existing_role" { + description = "Indicates if want use the kinesis firehose role to glue access." + type = bool + default = true +} + +variable "data_format_conversion_glue_role_arn" { + description = "The role that Kinesis Data Firehose can use to access AWS Glue. This role must be in the same account you use for Kinesis Data Firehose. Cross-account roles aren't allowed." + type = string + default = null +} + +variable "data_format_conversion_glue_table_name" { + description = "Specifies the AWS Glue table that contains the column information that constitutes your data schema" + type = string + default = null +} + +variable "data_format_conversion_glue_catalog_id" { + description = "The ID of the AWS Glue Data Catalog. If you don't supply this, the AWS account ID is used by default." + type = string + default = null +} + +variable "data_format_conversion_glue_region" { + description = "If you don't specify an AWS Region, the default is the current region." + type = string + default = null +} + +variable "data_format_conversion_glue_version_id" { + description = "Specifies the table version for the output data schema." + type = string + default = "LATEST" +} + +variable "data_format_conversion_input_format" { + description = "Specifies which deserializer to use. You can choose either the Apache Hive JSON SerDe or the OpenX JSON SerDe" + type = string + default = "OpenX" + validation { + error_message = "Valid values are HIVE and OPENX." + condition = contains(["HIVE", "OpenX"], var.data_format_conversion_input_format) + } +} + +variable "data_format_conversion_openx_case_insensitive" { + description = "When set to true, Kinesis Data Firehose converts JSON keys to lowercase before deserializing them." + type = bool + default = true +} + +variable "data_format_conversion_openx_convert_dots_to_underscores" { + description = "Specifies that the names of the keys include dots and that you want Kinesis Data Firehose to replace them with underscores. This is useful because Apache Hive does not allow dots in column names." + type = bool + default = false +} + +variable "data_format_conversion_openx_column_to_json_key_mappings" { + description = "A map of column names to JSON keys that aren't identical to the column names. This is useful when the JSON contains keys that are Hive keywords." + type = map(string) + default = null +} + +variable "data_format_conversion_hive_timestamps" { + description = "A list of how you want Kinesis Data Firehose to parse the date and time stamps that may be present in your input data JSON. To specify these format strings, follow the pattern syntax of JodaTime's DateTimeFormat format strings." + type = list(string) + default = [] +} + +variable "data_format_conversion_output_format" { + description = "Specifies which serializer to use. You can choose either the ORC SerDe or the Parquet SerDe" + type = string + default = "PARQUET" + validation { + error_message = "Valid values are ORC and PARQUET." + condition = contains(["ORC", "PARQUET"], var.data_format_conversion_output_format) + } +} + +variable "data_format_conversion_block_size" { + description = "The Hadoop Distributed File System (HDFS) block size. This is useful if you intend to copy the data from Amazon S3 to HDFS before querying. The Value is in Bytes." + type = number + default = 268435456 + validation { + error_message = "Minimum Value is 64 MiB." + condition = var.data_format_conversion_block_size >= 67108864 + } +} + +variable "data_format_conversion_parquet_compression" { + description = "The compression code to use over data blocks." + type = string + default = "SNAPPY" + validation { + error_message = "Valid values are UNCOMPRESSED, SNAPPY and GZIP." + condition = contains(["UNCOMPRESSED", "SNAPPY", "GZIP"], var.data_format_conversion_parquet_compression) + } +} + +variable "data_format_conversion_parquet_dict_compression" { + description = "Indicates whether to enable dictionary compression." + type = bool + default = false +} + +variable "data_format_conversion_parquet_max_padding" { + description = "The maximum amount of padding to apply. This is useful if you intend to copy the data from Amazon S3 to HDFS before querying. The value is in bytes" + type = number + default = 0 +} + +variable "data_format_conversion_parquet_page_size" { + description = "Column chunks are divided into pages. A page is conceptually an indivisible unit (in terms of compression and encoding). The value is in bytes" + type = number + default = 1048576 + validation { + error_message = "Minimum Value is 64 KiB." + condition = var.data_format_conversion_parquet_page_size >= 65536 + } +} + +variable "data_format_conversion_parquet_writer_version" { + description = "Indicates the version of row format to output." + type = string + default = "V1" + validation { + error_message = "Valid values are V1 and V2." + condition = contains(["V1", "V2"], var.data_format_conversion_parquet_writer_version) + } +} + +variable "data_format_conversion_orc_compression" { + description = "The compression code to use over data blocks." + type = string + default = "SNAPPY" + validation { + error_message = "Valid values are NONE, ZLIB and SNAPPY." + condition = contains(["NONE", "ZLIB", "SNAPPY"], var.data_format_conversion_orc_compression) + } +} + +variable "data_format_conversion_orc_format_version" { + description = "The version of the file to write." + type = string + default = "V0_12" + validation { + error_message = "Valid values are V0_11 and V0_12." + condition = contains(["V0_11", "V0_12"], var.data_format_conversion_orc_format_version) + } +} + +variable "data_format_conversion_orc_enable_padding" { + description = "Set this to true to indicate that you want stripes to be padded to the HDFS block boundaries. This is useful if you intend to copy the data from Amazon S3 to HDFS before querying." + type = bool + default = false +} + +variable "data_format_conversion_orc_padding_tolerance" { + description = "A float between 0 and 1 that defines the tolerance for block padding as a decimal fraction of stripe size." + type = number + default = 0.05 + validation { + error_message = "Valid values are V0_11 and V0_12." + condition = var.data_format_conversion_orc_padding_tolerance >= 0 && var.data_format_conversion_orc_padding_tolerance <= 1 + } +} + +variable "data_format_conversion_orc_dict_key_threshold" { + description = "A float that represents the fraction of the total number of non-null rows. To turn off dictionary encoding, set this fraction to a number that is less than the number of distinct keys in a dictionary. To always use dictionary encoding, set this threshold to 1." + type = number + default = 0.0 + validation { + error_message = "Valid values are between 0 and 1." + condition = var.data_format_conversion_orc_dict_key_threshold >= 0 && var.data_format_conversion_orc_dict_key_threshold <= 1 + } +} + +variable "data_format_conversion_orc_bloom_filter_columns" { + description = "A list of column names for which you want Kinesis Data Firehose to create bloom filters." + type = list(string) + default = [] +} + +variable "data_format_conversion_orc_bloom_filter_false_positive_probability" { + description = "The Bloom filter false positive probability (FPP). The lower the FPP, the bigger the Bloom filter." + type = number + default = 0.05 + validation { + error_message = "Valid values are between 0 and 1." + condition = var.data_format_conversion_orc_bloom_filter_false_positive_probability >= 0 && var.data_format_conversion_orc_bloom_filter_false_positive_probability <= 1 + } +} + +variable "data_format_conversion_orc_row_index_stripe" { + description = "The number of rows between index entries." + type = number + default = 10000 + validation { + error_message = "Minimum value is 1000." + condition = var.data_format_conversion_orc_row_index_stripe >= 1000 + } +} + +variable "data_format_conversion_orc_stripe_size" { + description = "he number of bytes in each strip." + type = number + default = 67108864 + validation { + error_message = "Minimum Value is 8 MiB." + condition = var.data_format_conversion_orc_stripe_size >= 8388608 + } +} + +###### +# Redshift Destination Variables +###### +variable "redshift_cluster_endpoint" { + description = "The redshift endpoint" + type = string + default = null +} + +variable "redshift_username" { + description = "The username that the firehose delivery stream will assume. It is strongly recommended that the username and password provided is used exclusively for Amazon Kinesis Firehose purposes, and that the permissions for the account are restricted for Amazon Redshift INSERT permissions" + type = string + default = null + sensitive = true +} + +variable "redshift_password" { + description = "The password for the redshift username above" + type = string + default = null + sensitive = true +} + +variable "redshift_database_name" { + description = "The redshift database name" + type = string + default = null +} + +variable "redshift_table_name" { + description = "The name of the table in the redshift cluster that the s3 bucket will copy to" + type = string + default = null +} + +variable "redshift_copy_options" { + description = "Copy options for copying the data from the s3 intermediate bucket into redshift, for example to change the default delimiter" + type = string + default = null +} + +variable "redshift_data_table_columns" { + description = "The data table columns that will be targeted by the copy command" + type = string + default = null +} + +variable "redshift_retry_duration" { + description = "The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt" + type = string + default = 3600 + validation { + error_message = "Minimum: 0 second, maximum: 7200 seconds." + condition = var.redshift_retry_duration >= 0 && var.redshift_retry_duration <= 7200 + } +} + +variable "redshift_cluster_identifier" { + description = "Redshift Cluster identifier. Necessary to associate the iam role to cluster" + type = string + default = null +} + +variable "associate_role_to_redshift_cluster" { + description = "Set it to false if don't want the module associate the role to redshift cluster" + type = bool + default = true +} ###### # IAM ######