Skip to content

Commit

Permalink
fix: Fix Incorrect ARNs in MSK Policy
Browse files Browse the repository at this point in the history
  • Loading branch information
fdmsantos committed Feb 18, 2025
1 parent 767f2c5 commit 8707bcb
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 55 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -998,13 +998,15 @@ No modules.
| [aws_iam_policy_document.kinesis](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.lambda](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.msk](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.msk_source_assume_role](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.opensearch](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.opensearchserverless](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.s3](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.s3_kms](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.secretsmanager](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.secretsmanager_cmk_encryption](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.vpc](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_msk_cluster.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/msk_cluster) | data source |
| [aws_region.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/region) | data source |
| [aws_subnet.subnet](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/subnet) | data source |

Expand Down
43 changes: 42 additions & 1 deletion examples/s3/msk-to-s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,35 @@ $ terraform apply

Note that this example may create resources which cost money. Run `terraform destroy` when you don't need these resources.

* Send Message to Kafka

[Documentation](https://docs.aws.amazon.com/msk/latest/developerguide/create-serverless-cluster-client.html)

```sh
# Create Client Machine
sudo su -
sudo yum -y install java-11
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
mv aws-msk-iam-auth-1.1.1-all.jar kafka_2.12-2.8.1/libs/
vi kafka_2.12-2.8.1/bin/client.properties
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

# Create Topic
export BS=my-endpoint
./kafka_2.12-2.8.1/bin/kafka-topics.sh --bootstrap-server $BS --command-config kafka_2.12-2.8.1/bin/client.properties --create --topic demo-topic --partitions 6

# Produce data
./kafka_2.12-2.8.1/bin/kafka-console-producer.sh --broker-list $BS --producer.config kafka_2.12-2.8.1/bin/client.properties --topic demo-topic

# Consume Data
./kafka_2.12-2.8.1/bin/kafka-console-consumer.sh --bootstrap-server $BS --consumer.config kafka_2.12-2.8.1/bin/client.properties --topic demo-topic --from-beginning
```

<!-- BEGINNING OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
## Requirements

Expand All @@ -34,6 +63,7 @@ Note that this example may create resources which cost money. Run `terraform des

| Name | Source | Version |
|------|--------|---------|
| <a name="module_ec2"></a> [ec2](#module\_ec2) | terraform-aws-modules/ec2-instance/aws | n/a |
| <a name="module_firehose"></a> [firehose](#module\_firehose) | ../../../ | n/a |
| <a name="module_msk_cluster"></a> [msk\_cluster](#module\_msk\_cluster) | terraform-aws-modules/msk-kafka-cluster/aws | 2.3.0 |
| <a name="module_security_group"></a> [security\_group](#module\_security\_group) | terraform-aws-modules/security-group/aws | ~> 5.0 |
Expand All @@ -53,8 +83,19 @@ Note that this example may create resources which cost money. Run `terraform des
| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_name_prefix"></a> [name\_prefix](#input\_name\_prefix) | Name prefix to use in resources | `string` | `"msk-to-s3-basic"` | no |
| <a name="input_tags"></a> [tags](#input\_tags) | Default Tags to be added to all resources. | `map(string)` | `{}` | no |

## Outputs

No outputs.
| Name | Description |
|------|-------------|
| <a name="output_kinesis_data_stream_name"></a> [kinesis\_data\_stream\_name](#output\_kinesis\_data\_stream\_name) | The name of the Kinesis Firehose Stream |
| <a name="output_kinesis_firehose_arn"></a> [kinesis\_firehose\_arn](#output\_kinesis\_firehose\_arn) | The ARN of the Kinesis Firehose Stream |
| <a name="output_kinesis_firehose_destination_id"></a> [kinesis\_firehose\_destination\_id](#output\_kinesis\_firehose\_destination\_id) | The Destination id of the Kinesis Firehose Stream |
| <a name="output_kinesis_firehose_role_arn"></a> [kinesis\_firehose\_role\_arn](#output\_kinesis\_firehose\_role\_arn) | The ARN of the IAM role created for Kinesis Firehose Stream |
| <a name="output_kinesis_firehose_version_id"></a> [kinesis\_firehose\_version\_id](#output\_kinesis\_firehose\_version\_id) | The Version id of the Kinesis Firehose Stream |
| <a name="output_msk_arn"></a> [msk\_arn](#output\_msk\_arn) | MSK Topic Endpoint |
| <a name="output_msk_brokers_endpoint"></a> [msk\_brokers\_endpoint](#output\_msk\_brokers\_endpoint) | Brokers endpoints |
| <a name="output_s3_bucket_arn"></a> [s3\_bucket\_arn](#output\_s3\_bucket\_arn) | S3 Bucket ARN |
| <a name="output_topic_name"></a> [topic\_name](#output\_topic\_name) | MSK Topic Name |
<!-- END OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
76 changes: 50 additions & 26 deletions examples/s3/msk-to-s3/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,62 @@ data "aws_availability_zones" "available" {}
locals {
vpc_cidr = "10.0.0.0/16"
azs = slice(data.aws_availability_zones.available.names, 0, 2)
topic = "demo-topic"
}

resource "random_pet" "this" {
length = 2
}

### S3 Bucket ###
resource "aws_s3_bucket" "s3" {
bucket = "${var.name_prefix}-destination-bucket-${random_pet.this.id}"
force_destroy = true
}

### Networking ###
module "vpc" {
source = "terraform-aws-modules/vpc/aws"
version = "~> 5.0"
name = "${var.name_prefix}-vpc"
cidr = local.vpc_cidr
azs = local.azs
public_subnets = [for k, v in local.azs : cidrsubnet(local.vpc_cidr, 8, k)]
private_subnets = [for k, v in local.azs : cidrsubnet(local.vpc_cidr, 8, k + 3)]
database_subnets = [for k, v in local.azs : cidrsubnet(local.vpc_cidr, 8, k + 6)]
create_database_subnet_group = true
enable_nat_gateway = true
single_nat_gateway = true
create_database_subnet_group = false
enable_nat_gateway = false
single_nat_gateway = false
}

module "security_group" {
source = "terraform-aws-modules/security-group/aws"
version = "~> 5.0"

name = "${var.name_prefix}-sg"
description = "Security group for ${var.name_prefix}-sg"
vpc_id = module.vpc.vpc_id

ingress_cidr_blocks = module.vpc.private_subnets_cidr_blocks
source = "terraform-aws-modules/security-group/aws"
version = "~> 5.0"
name = "${var.name_prefix}-sg"
description = "Security group for ${var.name_prefix}-sg"
vpc_id = module.vpc.vpc_id
ingress_cidr_blocks = ["0.0.0.0/0"]
ingress_rules = [
"kafka-broker-tcp",
"kafka-broker-tls-tcp"
"all-all"
]
egress_cidr_blocks = ["0.0.0.0/0"]
egress_rules = ["all-all"]
}

### MSK ###
module "ec2" {
source = "terraform-aws-modules/ec2-instance/aws"
name = "test-instance"
instance_type = "t2.micro"
subnet_id = module.vpc.public_subnets[0]
vpc_security_group_ids = [module.security_group.security_group_id]
create_iam_instance_profile = true
create_eip = true
iam_role_description = "IAM role for EC2 instance"
iam_role_policies = {
AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
AdministratorAccess = "arn:aws:iam::aws:policy/AdministratorAccess"
}
depends_on = [
module.msk_cluster
]
}

Expand Down Expand Up @@ -71,8 +90,8 @@ module "msk_cluster" {
sasl = { iam = true }
}
enable_storage_autoscaling = false
create_cloudwatch_log_group = false
cloudwatch_logs_enabled = false
create_cloudwatch_log_group = true
cloudwatch_logs_enabled = true
s3_logs_enabled = false
configuration_name = "${var.name_prefix}-msk-configuration"
configuration_description = "${var.name_prefix} MSK configuration"
Expand All @@ -92,22 +111,27 @@ resource "aws_msk_cluster_policy" "this" {
"Service" = "firehose.amazonaws.com"
}
Action = [
"kafka:Describe*",
"kafka:Get*",
"kafka:CreateVpcConnection",
"kafka:GetBootstrapBrokers",
"kafka:DescribeCluster",
"kafka:DescribeClusterV2"
]
Resource = module.msk_cluster.arn
}]
})
}

module "firehose" {
source = "../../../"
name = "${var.name_prefix}-delivery-stream"
input_source = "msk"
msk_source_cluster_arn = module.msk_cluster.arn
msk_source_topic_name = "test"
destination = "s3"
s3_bucket_arn = aws_s3_bucket.s3.arn
source = "../../../"
name = "${var.name_prefix}-delivery-stream"
input_source = "msk"
msk_source_cluster_arn = module.msk_cluster.arn
msk_source_connectivity_type = "PRIVATE"
msk_source_topic_name = local.topic
destination = "s3"
s3_bucket_arn = aws_s3_bucket.s3.arn
buffering_interval = 10
depends_on = [
aws_msk_cluster_policy.this
]
}
68 changes: 44 additions & 24 deletions examples/s3/msk-to-s3/outputs.tf
Original file line number Diff line number Diff line change
@@ -1,24 +1,44 @@
#output "kinesis_firehose_arn" {
# description = "The ARN of the Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_arn
#}
#
#output "kinesis_data_stream_name" {
# description = "The name of the Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_name
#}
#
#output "kinesis_firehose_destination_id" {
# description = "The Destination id of the Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_destination_id
#}
#
#output "kinesis_firehose_version_id" {
# description = "The Version id of the Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_version_id
#}
#
#output "kinesis_firehose_role_arn" {
# description = "The ARN of the IAM role created for Kinesis Firehose Stream"
# value = module.firehose.kinesis_firehose_role_arn
#}
output "msk_arn" {
description = "MSK Topic Endpoint"
value = module.msk_cluster.arn
}

output "kinesis_firehose_arn" {
description = "The ARN of the Kinesis Firehose Stream"
value = module.firehose.kinesis_firehose_arn
}

output "kinesis_data_stream_name" {
description = "The name of the Kinesis Firehose Stream"
value = module.firehose.kinesis_firehose_name
}

output "kinesis_firehose_destination_id" {
description = "The Destination id of the Kinesis Firehose Stream"
value = module.firehose.kinesis_firehose_destination_id
}

output "kinesis_firehose_version_id" {
description = "The Version id of the Kinesis Firehose Stream"
value = module.firehose.kinesis_firehose_version_id
}

output "kinesis_firehose_role_arn" {
description = "The ARN of the IAM role created for Kinesis Firehose Stream"
value = module.firehose.kinesis_firehose_role_arn
}

output "msk_brokers_endpoint" {
description = "Brokers endpoints"
value = module.msk_cluster.bootstrap_brokers
}

output "topic_name" {
description = "MSK Topic Name"
value = local.topic
}

output "s3_bucket_arn" {
description = "S3 Bucket ARN"
value = aws_s3_bucket.s3.arn
}
8 changes: 8 additions & 0 deletions examples/s3/msk-to-s3/providers.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
provider "aws" {
skip_metadata_api_check = true
skip_region_validation = true
skip_credentials_validation = true
default_tags {
tags = var.tags
}
}
6 changes: 6 additions & 0 deletions examples/s3/msk-to-s3/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@ variable "name_prefix" {
type = string
default = "msk-to-s3-basic"
}

variable "tags" {
description = "Default Tags to be added to all resources."
type = map(string)
default = {}
}
24 changes: 20 additions & 4 deletions iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ locals {
add_vpc_policy = var.create && var.create_role && var.enable_vpc && var.vpc_use_existing_role && local.is_search_destination
add_secretsmanager_policy = var.create && var.create_role && var.enable_secrets_manager
add_secretsmanager_decrypt_policy = local.add_secretsmanager_policy && var.secret_kms_key_arn != null
msk_source_topic_arn = local.add_msk_source_policy ? "arn:aws:kafka:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:topic/${data.aws_msk_cluster.this[0].cluster_name}/${data.aws_msk_cluster.this[0].cluster_uuid}/${var.msk_source_topic_name}" : null
msk_source_group_arn = local.add_msk_source_policy ? "arn:aws:kafka:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:group/${data.aws_msk_cluster.this[0].cluster_name}/${data.aws_msk_cluster.this[0].cluster_uuid}/*" : null
}

data "aws_iam_policy_document" "assume_role" {
count = var.create && var.create_role ? 1 : 0
count = var.create && var.create_role && !local.is_msk_source ? 1 : 0

statement {
effect = "Allow"
Expand All @@ -41,14 +43,28 @@ data "aws_iam_policy_document" "assume_role" {
}
}

data "aws_iam_policy_document" "msk_source_assume_role" {
count = var.create && var.create_role && local.is_msk_source ? 1 : 0

statement {
effect = "Allow"
actions = ["sts:AssumeRole"]

principals {
type = "Service"
identifiers = ["firehose.amazonaws.com"]
}
}
}

resource "aws_iam_role" "firehose" {
count = var.create && var.create_role ? 1 : 0
name = local.role_name
description = var.role_description
path = var.role_path
force_detach_policies = var.role_force_detach_policies
permissions_boundary = var.role_permissions_boundary
assume_role_policy = data.aws_iam_policy_document.assume_role[0].json
assume_role_policy = !local.is_msk_source ? data.aws_iam_policy_document.assume_role[0].json : data.aws_iam_policy_document.msk_source_assume_role[0].json
tags = merge(var.tags, var.role_tags)
}

Expand Down Expand Up @@ -128,7 +144,7 @@ data "aws_iam_policy_document" "msk" {
"kafka-cluster:ReadData"
]
resources = [
"${var.msk_source_cluster_arn}/${var.msk_source_topic_name}"
local.msk_source_topic_arn
]
}

Expand All @@ -138,7 +154,7 @@ data "aws_iam_policy_document" "msk" {
"kafka-cluster:DescribeGroup"
]
resources = [
"${var.msk_source_cluster_arn}/*"
local.msk_source_group_arn
]
}
}
Expand Down
5 changes: 5 additions & 0 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ data "aws_subnet" "subnet" {
id = var.vpc_subnet_ids[0]
}

data "aws_msk_cluster" "this" {
count = local.is_msk_source ? 1 : 0
cluster_name = split("/", var.msk_source_cluster_arn)[1]
}

resource "aws_kinesis_firehose_delivery_stream" "this" {
count = var.create ? 1 : 0
name = local.is_waf_source ? "aws-waf-logs-${var.name}" : var.name
Expand Down

0 comments on commit 8707bcb

Please sign in to comment.