fix(firehose): false positive in firehose_stream_encrypted_at_rest (#8707)

Co-authored-by: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
This commit is contained in:
Prowler Bot
2025-09-12 10:02:25 +02:00
committed by GitHub
parent 4dd6547b9c
commit ad4475efc9
18 changed files with 724 additions and 163 deletions

View File

@@ -6,6 +6,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
### Fixed
- Replaced old check id with new ones for compliance files [(#8682)](https://github.com/prowler-cloud/prowler/pull/8682)
- `firehose_stream_encrypted_at_rest` check false positives and new api call in kafka service [(#8599)](https://github.com/prowler-cloud/prowler/pull/8599)
- Replace defender rules policies key to use old name [(#8702)](https://github.com/prowler-cloud/prowler/pull/8702)
## [v5.12.0] (Prowler v5.12.0)

View File

@@ -3,6 +3,7 @@ from typing import List
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.firehose.firehose_client import firehose_client
from prowler.providers.aws.services.firehose.firehose_service import EncryptionStatus
from prowler.providers.aws.services.kafka.kafka_client import kafka_client
from prowler.providers.aws.services.kinesis.kinesis_client import kinesis_client
from prowler.providers.aws.services.kinesis.kinesis_service import EncryptionType
@@ -37,7 +38,28 @@ class firehose_stream_encrypted_at_rest(Check):
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled but the source stream {source_stream.name} has at rest encryption enabled."
# Check if the stream has encryption enabled directly
# MSK source - check if the MSK cluster has encryption at rest with CMK
elif stream.delivery_stream_type == "MSKAsSource":
msk_cluster_arn = stream.source.msk.msk_cluster_arn
if msk_cluster_arn:
msk_cluster = None
for cluster in kafka_client.clusters.values():
if cluster.arn == msk_cluster_arn:
msk_cluster = cluster
break
if msk_cluster:
# All MSK clusters (both provisioned and serverless) always have encryption at rest enabled by AWS
# AWS MSK always encrypts data at rest - either with AWS managed keys or CMK
report.status = "PASS"
if msk_cluster.kafka_version == "SERVERLESS":
report.status_extended = f"Firehose Stream {stream.name} uses MSK serverless source which always has encryption at rest enabled by default."
else:
report.status_extended = f"Firehose Stream {stream.name} uses MSK provisioned source which always has encryption at rest enabled by AWS (either with AWS managed keys or CMK)."
else:
report.status_extended = f"Firehose Stream {stream.name} uses MSK source which always has encryption at rest enabled by AWS."
# Check if the stream has encryption enabled directly (DirectPut or DatabaseAsSource cases)
elif stream.kms_encryption == EncryptionStatus.ENABLED:
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does have at rest encryption enabled."

View File

@@ -12,7 +12,12 @@ class kafka_cluster_encryption_at_rest_uses_cmk(Check):
report.status = "FAIL"
report.status_extended = f"Kafka cluster '{cluster.name}' does not have encryption at rest enabled with a CMK."
if any(
# Serverless clusters always have encryption at rest enabled by default
if cluster.kafka_version == "SERVERLESS":
report.status = "PASS"
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always has encryption at rest enabled by default."
# For provisioned clusters, check if they use a customer managed KMS key
elif any(
(
cluster.data_volume_kms_key_id == key.arn
and getattr(key, "manager", "") == "CUSTOMER"

View File

@@ -13,7 +13,12 @@ class kafka_cluster_enhanced_monitoring_enabled(Check):
f"Kafka cluster '{cluster.name}' has enhanced monitoring enabled."
)
if cluster.enhanced_monitoring == "DEFAULT":
# Serverless clusters always have enhanced monitoring enabled by default
if cluster.kafka_version == "SERVERLESS":
report.status = "PASS"
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always has enhanced monitoring enabled by default."
# For provisioned clusters, check the enhanced monitoring configuration
elif cluster.enhanced_monitoring == "DEFAULT":
report.status = "FAIL"
report.status_extended = f"Kafka cluster '{cluster.name}' does not have enhanced monitoring enabled."

View File

@@ -11,7 +11,12 @@ class kafka_cluster_in_transit_encryption_enabled(Check):
report.status = "FAIL"
report.status_extended = f"Kafka cluster '{cluster.name}' does not have encryption in transit enabled."
if (
# Serverless clusters always have encryption in transit enabled by default
if cluster.kafka_version == "SERVERLESS":
report.status = "PASS"
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always has encryption in transit enabled by default."
# For provisioned clusters, check the encryption configuration
elif (
cluster.encryption_in_transit.client_broker == "TLS"
and cluster.encryption_in_transit.in_cluster
):

View File

@@ -13,7 +13,12 @@ class kafka_cluster_is_public(Check):
f"Kafka cluster {cluster.name} is publicly accessible."
)
if not cluster.public_access:
# Serverless clusters are always private by default
if cluster.kafka_version == "SERVERLESS":
report.status = "PASS"
report.status_extended = f"Kafka cluster {cluster.name} is serverless and always private by default."
# For provisioned clusters, check the public access configuration
elif not cluster.public_access:
report.status = "PASS"
report.status_extended = (
f"Kafka cluster {cluster.name} is not publicly accessible."

View File

@@ -11,7 +11,12 @@ class kafka_cluster_mutual_tls_authentication_enabled(Check):
report.status = "FAIL"
report.status_extended = f"Kafka cluster '{cluster.name}' does not have mutual TLS authentication enabled."
if cluster.tls_authentication:
# Serverless clusters always have TLS authentication enabled by default
if cluster.kafka_version == "SERVERLESS":
report.status = "PASS"
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always has TLS authentication enabled by default."
# For provisioned clusters, check the TLS configuration
elif cluster.tls_authentication:
report.status = "PASS"
report.status_extended = f"Kafka cluster '{cluster.name}' has mutual TLS authentication enabled."

View File

@@ -13,7 +13,12 @@ class kafka_cluster_unrestricted_access_disabled(Check):
f"Kafka cluster '{cluster.name}' has unrestricted access enabled."
)
if not cluster.unauthentication_access:
# Serverless clusters always require authentication by default
if cluster.kafka_version == "SERVERLESS":
report.status = "PASS"
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and always requires authentication by default."
# For provisioned clusters, check the unauthenticated access configuration
elif not cluster.unauthentication_access:
report.status = "PASS"
report.status_extended = f"Kafka cluster '{cluster.name}' does not have unrestricted access enabled."

View File

@@ -13,7 +13,12 @@ class kafka_cluster_uses_latest_version(Check):
f"Kafka cluster '{cluster.name}' is using the latest version."
)
if cluster.kafka_version != kafka_client.kafka_versions[-1].version:
# Serverless clusters don't have specific Kafka versions - AWS manages them automatically
if cluster.kafka_version == "SERVERLESS":
report.status = "PASS"
report.status_extended = f"Kafka cluster '{cluster.name}' is serverless and AWS automatically manages the Kafka version."
# For provisioned clusters, check if they're using the latest version
elif cluster.kafka_version != kafka_client.kafka_versions[-1].version:
report.status = "FAIL"
report.status_extended = (
f"Kafka cluster '{cluster.name}' is not using the latest version."

View File

@@ -15,61 +15,133 @@ class Kafka(AWSService):
self.__threading_call__(self._list_kafka_versions)
def _list_clusters(self, regional_client):
logger.info(f"Kafka - Listing clusters in region {regional_client.region}...")
try:
cluster_paginator = regional_client.get_paginator("list_clusters")
# Use list_clusters_v2 to support both provisioned and serverless clusters
cluster_paginator = regional_client.get_paginator("list_clusters_v2")
logger.info(
f"Kafka - Paginator created for region {regional_client.region}"
)
for page in cluster_paginator.paginate():
logger.info(
f"Kafka - Processing page with {len(page.get('ClusterInfoList', []))} clusters in region {regional_client.region}"
)
for cluster in page["ClusterInfoList"]:
logger.info(
f"Kafka - Found cluster: {cluster.get('ClusterName', 'Unknown')} in region {regional_client.region}"
)
arn = cluster.get(
"ClusterArn",
f"{self.account_arn_template}/{cluster.get('ClusterName', '')}",
)
cluster_type = cluster.get("ClusterType", "UNKNOWN")
if not self.audit_resources or is_resource_filtered(
arn, self.audit_resources
):
self.clusters[cluster.get("ClusterArn", "")] = Cluster(
id=arn.split(":")[-1].split("/")[-1],
name=cluster.get("ClusterName", ""),
arn=arn,
region=regional_client.region,
tags=list(cluster.get("Tags", {})),
state=cluster.get("State", ""),
kafka_version=cluster.get(
"CurrentBrokerSoftwareInfo", {}
).get("KafkaVersion", ""),
data_volume_kms_key_id=cluster.get("EncryptionInfo", {})
.get("EncryptionAtRest", {})
.get("DataVolumeKMSKeyId", ""),
encryption_in_transit=EncryptionInTransit(
client_broker=cluster.get("EncryptionInfo", {})
.get("EncryptionInTransit", {})
.get("ClientBroker", "PLAINTEXT"),
in_cluster=cluster.get("EncryptionInfo", {})
.get("EncryptionInTransit", {})
.get("InCluster", False),
),
tls_authentication=cluster.get("ClientAuthentication", {})
.get("Tls", {})
.get("Enabled", False),
public_access=cluster.get("BrokerNodeGroupInfo", {})
.get("ConnectivityInfo", {})
.get("PublicAccess", {})
.get("Type", "SERVICE_PROVIDED_EIPS")
!= "DISABLED",
unauthentication_access=cluster.get(
"ClientAuthentication", {}
# Handle provisioned clusters
if cluster_type == "PROVISIONED" and "Provisioned" in cluster:
provisioned = cluster["Provisioned"]
self.clusters[cluster.get("ClusterArn", "")] = Cluster(
id=arn.split(":")[-1].split("/")[-1],
name=cluster.get("ClusterName", ""),
arn=arn,
region=regional_client.region,
tags=(
list(cluster.get("Tags", {}).values())
if cluster.get("Tags")
else []
),
state=cluster.get("State", ""),
kafka_version=provisioned.get(
"CurrentBrokerSoftwareInfo", {}
).get("KafkaVersion", ""),
data_volume_kms_key_id=provisioned.get(
"EncryptionInfo", {}
)
.get("EncryptionAtRest", {})
.get("DataVolumeKMSKeyId", ""),
encryption_in_transit=EncryptionInTransit(
client_broker=provisioned.get("EncryptionInfo", {})
.get("EncryptionInTransit", {})
.get("ClientBroker", "PLAINTEXT"),
in_cluster=provisioned.get("EncryptionInfo", {})
.get("EncryptionInTransit", {})
.get("InCluster", False),
),
tls_authentication=provisioned.get(
"ClientAuthentication", {}
)
.get("Tls", {})
.get("Enabled", False),
public_access=provisioned.get("BrokerNodeGroupInfo", {})
.get("ConnectivityInfo", {})
.get("PublicAccess", {})
.get("Type", "SERVICE_PROVIDED_EIPS")
!= "DISABLED",
unauthentication_access=provisioned.get(
"ClientAuthentication", {}
)
.get("Unauthenticated", {})
.get("Enabled", False),
enhanced_monitoring=provisioned.get(
"EnhancedMonitoring", "DEFAULT"
),
)
.get("Unauthenticated", {})
.get("Enabled", False),
enhanced_monitoring=cluster.get(
"EnhancedMonitoring", "DEFAULT"
),
logger.info(
f"Kafka - Added provisioned cluster {cluster.get('ClusterName', 'Unknown')} to clusters dict"
)
# Handle serverless clusters
elif cluster_type == "SERVERLESS" and "Serverless" in cluster:
# For serverless clusters, encryption is always enabled by default
# We'll create a Cluster object with default encryption values
self.clusters[cluster.get("ClusterArn", "")] = Cluster(
id=arn.split(":")[-1].split("/")[-1],
name=cluster.get("ClusterName", ""),
arn=arn,
region=regional_client.region,
tags=(
list(cluster.get("Tags", {}).values())
if cluster.get("Tags")
else []
),
state=cluster.get("State", ""),
kafka_version="SERVERLESS", # Serverless doesn't have specific Kafka version
data_volume_kms_key_id="AWS_MANAGED", # Serverless uses AWS managed keys
encryption_in_transit=EncryptionInTransit(
client_broker="TLS", # Serverless always has TLS enabled
in_cluster=True, # Serverless always has in-cluster encryption
),
tls_authentication=True, # Serverless always has TLS authentication
public_access=False, # Serverless clusters are always private
unauthentication_access=False, # Serverless requires authentication
enhanced_monitoring="DEFAULT",
)
logger.info(
f"Kafka - Added serverless cluster {cluster.get('ClusterName', 'Unknown')} to clusters dict"
)
else:
logger.warning(
f"Kafka - Unknown cluster type {cluster_type} for cluster {cluster.get('ClusterName', 'Unknown')}"
)
else:
logger.info(
f"Kafka - Cluster {cluster.get('ClusterName', 'Unknown')} filtered out by audit_resources"
)
logger.info(
f"Kafka - Total clusters found in region {regional_client.region}: {len(self.clusters)}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
logger.error(
f"Kafka - Error details in region {regional_client.region}: {str(error)}"
)
def _list_kafka_versions(self, regional_client):
try:

View File

@@ -162,3 +162,64 @@ class Test_kafka_cluster_encryption_at_rest_uses_cmk:
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1
def test_kafka_cluster_serverless_encryption_at_rest(self):
kafka_client = MagicMock
kafka_client.clusters = {
"arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6": Cluster(
id="6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
name="serverless-cluster-1",
arn="arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
region=AWS_REGION_US_EAST_1,
tags=[],
state="ACTIVE",
kafka_version="SERVERLESS",
data_volume_kms_key_id="AWS_MANAGED",
encryption_in_transit=EncryptionInTransit(
client_broker="TLS",
in_cluster=True,
),
tls_authentication=True,
public_access=False,
unauthentication_access=False,
enhanced_monitoring="DEFAULT",
)
}
kms_client = MagicMock
kms_client.keys = []
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_cluster_encryption_at_rest_uses_cmk.kafka_cluster_encryption_at_rest_uses_cmk.kms_client",
new=kms_client,
),
):
from prowler.providers.aws.services.kafka.kafka_cluster_encryption_at_rest_uses_cmk.kafka_cluster_encryption_at_rest_uses_cmk import (
kafka_cluster_encryption_at_rest_uses_cmk,
)
check = kafka_cluster_encryption_at_rest_uses_cmk()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Kafka cluster 'serverless-cluster-1' is serverless and always has encryption at rest enabled by default."
)
assert result[0].resource_id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert (
result[0].resource_arn
== "arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1

View File

@@ -4,7 +4,7 @@ from prowler.providers.aws.services.kafka.kafka_service import (
Cluster,
EncryptionInTransit,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
from tests.providers.aws.utils import AWS_REGION_US_EAST_1
class Test_kafka_cluster_enhanced_monitoring_enabled:
@@ -14,11 +14,11 @@ class Test_kafka_cluster_enhanced_monitoring_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -56,11 +56,11 @@ class Test_kafka_cluster_enhanced_monitoring_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -110,11 +110,11 @@ class Test_kafka_cluster_enhanced_monitoring_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -138,3 +138,57 @@ class Test_kafka_cluster_enhanced_monitoring_enabled:
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1
def test_kafka_cluster_serverless_enhanced_monitoring(self):
kafka_client = MagicMock
kafka_client.clusters = {
"arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6": Cluster(
id="6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
name="serverless-cluster-1",
arn="arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
region=AWS_REGION_US_EAST_1,
tags=[],
state="ACTIVE",
kafka_version="SERVERLESS",
data_volume_kms_key_id="AWS_MANAGED",
encryption_in_transit=EncryptionInTransit(
client_broker="TLS",
in_cluster=True,
),
tls_authentication=True,
public_access=False,
unauthentication_access=False,
enhanced_monitoring="DEFAULT",
)
}
with (
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
from prowler.providers.aws.services.kafka.kafka_cluster_enhanced_monitoring_enabled.kafka_cluster_enhanced_monitoring_enabled import (
kafka_cluster_enhanced_monitoring_enabled,
)
check = kafka_cluster_enhanced_monitoring_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Kafka cluster 'serverless-cluster-1' is serverless and always has enhanced monitoring enabled by default."
)
assert result[0].resource_id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert (
result[0].resource_arn
== "arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1

View File

@@ -4,7 +4,7 @@ from prowler.providers.aws.services.kafka.kafka_service import (
Cluster,
EncryptionInTransit,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
from tests.providers.aws.utils import AWS_REGION_US_EAST_1
class Test_kafka_cluster_in_transit_encryption_enabled:
@@ -14,11 +14,11 @@ class Test_kafka_cluster_in_transit_encryption_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -56,11 +56,11 @@ class Test_kafka_cluster_in_transit_encryption_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -110,11 +110,11 @@ class Test_kafka_cluster_in_transit_encryption_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -164,11 +164,11 @@ class Test_kafka_cluster_in_transit_encryption_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -191,3 +191,57 @@ class Test_kafka_cluster_in_transit_encryption_enabled:
== "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-5"
)
assert result[0].region == AWS_REGION_US_EAST_1
def test_kafka_cluster_serverless_in_transit_encryption(self):
kafka_client = MagicMock
kafka_client.clusters = {
"arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6": Cluster(
id="6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
name="serverless-cluster-1",
arn="arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
region=AWS_REGION_US_EAST_1,
tags=[],
state="ACTIVE",
kafka_version="SERVERLESS",
data_volume_kms_key_id="AWS_MANAGED",
encryption_in_transit=EncryptionInTransit(
client_broker="TLS",
in_cluster=True,
),
tls_authentication=True,
public_access=False,
unauthentication_access=False,
enhanced_monitoring="DEFAULT",
)
}
with (
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
from prowler.providers.aws.services.kafka.kafka_cluster_in_transit_encryption_enabled.kafka_cluster_in_transit_encryption_enabled import (
kafka_cluster_in_transit_encryption_enabled,
)
check = kafka_cluster_in_transit_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Kafka cluster 'serverless-cluster-1' is serverless and always has encryption in transit enabled by default."
)
assert result[0].resource_id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert (
result[0].resource_arn
== "arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []

View File

@@ -4,7 +4,7 @@ from prowler.providers.aws.services.kafka.kafka_service import (
Cluster,
EncryptionInTransit,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
from tests.providers.aws.utils import AWS_REGION_US_EAST_1
class Test_kafka_cluster_is_public:
@@ -14,11 +14,11 @@ class Test_kafka_cluster_is_public:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -56,11 +56,11 @@ class Test_kafka_cluster_is_public:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -110,11 +110,11 @@ class Test_kafka_cluster_is_public:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -138,3 +138,57 @@ class Test_kafka_cluster_is_public:
assert result[0].resource_id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-5"
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
def test_kafka_cluster_serverless_public(self):
kafka_client = MagicMock
kafka_client.clusters = {
"arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6": Cluster(
id="6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
name="serverless-cluster-1",
arn="arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
region=AWS_REGION_US_EAST_1,
tags=[],
state="ACTIVE",
kafka_version="SERVERLESS",
data_volume_kms_key_id="AWS_MANAGED",
encryption_in_transit=EncryptionInTransit(
client_broker="TLS",
in_cluster=True,
),
tls_authentication=True,
public_access=False,
unauthentication_access=False,
enhanced_monitoring="DEFAULT",
)
}
with (
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
from prowler.providers.aws.services.kafka.kafka_cluster_is_public.kafka_cluster_is_public import (
kafka_cluster_is_public,
)
check = kafka_cluster_is_public()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Kafka cluster serverless-cluster-1 is serverless and always private by default."
)
assert (
result[0].resource_arn
== "arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
)
assert result[0].resource_id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []

View File

@@ -4,7 +4,7 @@ from prowler.providers.aws.services.kafka.kafka_service import (
Cluster,
EncryptionInTransit,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
from tests.providers.aws.utils import AWS_REGION_US_EAST_1
class Test_kafka_cluster_mutual_tls_authentication_enabled:
@@ -14,11 +14,11 @@ class Test_kafka_cluster_mutual_tls_authentication_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -56,11 +56,11 @@ class Test_kafka_cluster_mutual_tls_authentication_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -110,11 +110,11 @@ class Test_kafka_cluster_mutual_tls_authentication_enabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -138,3 +138,57 @@ class Test_kafka_cluster_mutual_tls_authentication_enabled:
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
def test_kafka_cluster_serverless_mutual_tls_authentication(self):
kafka_client = MagicMock
kafka_client.clusters = {
"arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6": Cluster(
id="6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
name="serverless-cluster-1",
arn="arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
region=AWS_REGION_US_EAST_1,
tags=[],
state="ACTIVE",
kafka_version="SERVERLESS",
data_volume_kms_key_id="AWS_MANAGED",
encryption_in_transit=EncryptionInTransit(
client_broker="TLS",
in_cluster=True,
),
tls_authentication=True,
public_access=False,
unauthentication_access=False,
enhanced_monitoring="DEFAULT",
)
}
with (
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
from prowler.providers.aws.services.kafka.kafka_cluster_mutual_tls_authentication_enabled.kafka_cluster_mutual_tls_authentication_enabled import (
kafka_cluster_mutual_tls_authentication_enabled,
)
check = kafka_cluster_mutual_tls_authentication_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Kafka cluster 'serverless-cluster-1' is serverless and always has TLS authentication enabled by default."
)
assert result[0].resource_id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert (
result[0].resource_arn
== "arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []

View File

@@ -4,7 +4,7 @@ from prowler.providers.aws.services.kafka.kafka_service import (
Cluster,
EncryptionInTransit,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
from tests.providers.aws.utils import AWS_REGION_US_EAST_1
class Test_kafka_cluster_unrestricted_access_disabled:
@@ -14,11 +14,11 @@ class Test_kafka_cluster_unrestricted_access_disabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -56,11 +56,11 @@ class Test_kafka_cluster_unrestricted_access_disabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -110,11 +110,11 @@ class Test_kafka_cluster_unrestricted_access_disabled:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -138,3 +138,57 @@ class Test_kafka_cluster_unrestricted_access_disabled:
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
def test_kafka_cluster_serverless_unrestricted_access_disabled(self):
kafka_client = MagicMock
kafka_client.clusters = {
"arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6": Cluster(
id="6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
name="serverless-cluster-1",
arn="arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
region=AWS_REGION_US_EAST_1,
tags=[],
state="ACTIVE",
kafka_version="SERVERLESS",
data_volume_kms_key_id="AWS_MANAGED",
encryption_in_transit=EncryptionInTransit(
client_broker="TLS",
in_cluster=True,
),
tls_authentication=True,
public_access=False,
unauthentication_access=False,
enhanced_monitoring="DEFAULT",
)
}
with (
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
from prowler.providers.aws.services.kafka.kafka_cluster_unrestricted_access_disabled.kafka_cluster_unrestricted_access_disabled import (
kafka_cluster_unrestricted_access_disabled,
)
check = kafka_cluster_unrestricted_access_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Kafka cluster 'serverless-cluster-1' is serverless and always requires authentication by default."
)
assert result[0].resource_id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert (
result[0].resource_arn
== "arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []

View File

@@ -5,7 +5,7 @@ from prowler.providers.aws.services.kafka.kafka_service import (
EncryptionInTransit,
KafkaVersion,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
from tests.providers.aws.utils import AWS_REGION_US_EAST_1
class Test_kafka_cluster_latest_version:
@@ -15,11 +15,11 @@ class Test_kafka_cluster_latest_version:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -62,11 +62,11 @@ class Test_kafka_cluster_latest_version:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -121,11 +121,11 @@ class Test_kafka_cluster_latest_version:
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider([AWS_REGION_US_EAST_1]),
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
@@ -149,3 +149,62 @@ class Test_kafka_cluster_latest_version:
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1
def test_kafka_cluster_serverless_uses_latest_version(self):
kafka_client = MagicMock
kafka_client.clusters = {
"arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6": Cluster(
id="6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
name="serverless-cluster-1",
arn="arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
region=AWS_REGION_US_EAST_1,
tags=[],
state="ACTIVE",
kafka_version="SERVERLESS",
data_volume_kms_key_id="AWS_MANAGED",
encryption_in_transit=EncryptionInTransit(
client_broker="TLS",
in_cluster=True,
),
tls_authentication=True,
public_access=False,
unauthentication_access=False,
enhanced_monitoring="DEFAULT",
)
}
kafka_client.kafka_versions = [
KafkaVersion(version="1.0.0", status="DEPRECATED"),
KafkaVersion(version="2.8.0", status="ACTIVE"),
]
with (
patch(
"prowler.providers.aws.services.kafka.kafka_service.Kafka",
new=kafka_client,
),
patch(
"prowler.providers.aws.services.kafka.kafka_client.kafka_client",
new=kafka_client,
),
):
from prowler.providers.aws.services.kafka.kafka_cluster_uses_latest_version.kafka_cluster_uses_latest_version import (
kafka_cluster_uses_latest_version,
)
check = kafka_cluster_uses_latest_version()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Kafka cluster 'serverless-cluster-1' is serverless and AWS automatically manages the Kafka version."
)
assert result[0].resource_id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert (
result[0].resource_arn
== "arn:aws:kafka:us-east-1:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_US_EAST_1

View File

@@ -13,47 +13,67 @@ make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListClusters":
if operation_name == "ListClustersV2":
return {
"ClusterInfoList": [
{
"BrokerNodeGroupInfo": {
"BrokerAZDistribution": "DEFAULT",
"ClientSubnets": ["subnet-cbfff283", "subnet-6746046b"],
"InstanceType": "kafka.m5.large",
"SecurityGroups": ["sg-f839b688"],
"StorageInfo": {"EbsStorageInfo": {"VolumeSize": 100}},
},
"ClusterType": "PROVISIONED",
"ClusterArn": f"arn:aws:kafka:{AWS_REGION_US_EAST_1}:123456789012:cluster/demo-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-5",
"ClusterName": "demo-cluster-1",
"CreationTime": "2020-07-09T02:31:36.223000+00:00",
"CurrentBrokerSoftwareInfo": {"KafkaVersion": "2.2.1"},
"CurrentVersion": "K3AEGXETSR30VB",
"EncryptionInfo": {
"EncryptionAtRest": {
"DataVolumeKMSKeyId": f"arn:aws:kms:{AWS_REGION_US_EAST_1}:123456789012:key/a7ca56d5-0768-4b64-a670-339a9fbef81c"
},
"EncryptionInTransit": {
"ClientBroker": "TLS_PLAINTEXT",
"InCluster": True,
},
},
"ClientAuthentication": {
"Tls": {"CertificateAuthorityArnList": [], "Enabled": True},
"Unauthenticated": {"Enabled": False},
},
"EnhancedMonitoring": "DEFAULT",
"OpenMonitoring": {
"Prometheus": {
"JmxExporter": {"EnabledInBroker": False},
"NodeExporter": {"EnabledInBroker": False},
}
},
"NumberOfBrokerNodes": 2,
"State": "ACTIVE",
"Tags": {},
"ZookeeperConnectString": f"z-2.demo-cluster-1.xuy0sb.c5.kafka.{AWS_REGION_US_EAST_1}.amazonaws.com:2181,z-1.demo-cluster-1.xuy0sb.c5.kafka.{AWS_REGION_US_EAST_1}.amazonaws.com:2181,z-3.demo-cluster-1.xuy0sb.c5.kafka.{AWS_REGION_US_EAST_1}.amazonaws.com:2181",
}
"Provisioned": {
"BrokerNodeGroupInfo": {
"BrokerAZDistribution": "DEFAULT",
"ClientSubnets": ["subnet-cbfff283", "subnet-6746046b"],
"InstanceType": "kafka.m5.large",
"SecurityGroups": ["sg-f839b688"],
"StorageInfo": {"EbsStorageInfo": {"VolumeSize": 100}},
"ConnectivityInfo": {
"PublicAccess": {"Type": "SERVICE_PROVIDED_EIPS"}
},
},
"CurrentBrokerSoftwareInfo": {"KafkaVersion": "2.2.1"},
"CurrentVersion": "K3AEGXETSR30VB",
"EncryptionInfo": {
"EncryptionAtRest": {
"DataVolumeKMSKeyId": f"arn:aws:kms:{AWS_REGION_US_EAST_1}:123456789012:key/a7ca56d5-0768-4b64-a670-339a9fbef81c"
},
"EncryptionInTransit": {
"ClientBroker": "TLS_PLAINTEXT",
"InCluster": True,
},
},
"ClientAuthentication": {
"Tls": {"CertificateAuthorityArnList": [], "Enabled": True},
"Unauthenticated": {"Enabled": False},
},
"EnhancedMonitoring": "DEFAULT",
"OpenMonitoring": {
"Prometheus": {
"JmxExporter": {"EnabledInBroker": False},
"NodeExporter": {"EnabledInBroker": False},
}
},
"NumberOfBrokerNodes": 2,
"ZookeeperConnectString": f"z-2.demo-cluster-1.xuy0sb.c5.kafka.{AWS_REGION_US_EAST_1}.amazonaws.com:2181,z-1.demo-cluster-1.xuy0sb.c5.kafka.{AWS_REGION_US_EAST_1}.amazonaws.com:2181,z-3.demo-cluster-1.xuy0sb.c5.kafka.{AWS_REGION_US_EAST_1}.amazonaws.com:2181",
},
},
{
"ClusterType": "SERVERLESS",
"ClusterArn": f"arn:aws:kafka:{AWS_REGION_US_EAST_1}:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6",
"ClusterName": "serverless-cluster-1",
"State": "ACTIVE",
"Tags": {},
"Serverless": {
"VpcConfigs": [
{
"SubnetIds": ["subnet-cbfff283", "subnet-6746046b"],
"SecurityGroups": ["sg-f839b688"],
}
],
},
},
]
}
elif operation_name == "ListKafkaVersions":
@@ -86,32 +106,53 @@ class TestKafkaService:
assert kafka.__class__.__name__ == "Kafka"
assert kafka.session.__class__.__name__ == "Session"
assert kafka.audited_account == AWS_ACCOUNT_NUMBER
# Clusters assertions
assert len(kafka.clusters) == 1
cluster_arn = f"arn:aws:kafka:{AWS_REGION_US_EAST_1}:123456789012:cluster/demo-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-5"
assert cluster_arn in kafka.clusters
# Clusters assertions - should now include both provisioned and serverless
assert len(kafka.clusters) == 2
# Check provisioned cluster
provisioned_cluster_arn = f"arn:aws:kafka:{AWS_REGION_US_EAST_1}:123456789012:cluster/demo-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-5"
assert provisioned_cluster_arn in kafka.clusters
provisioned_cluster = kafka.clusters[provisioned_cluster_arn]
assert provisioned_cluster.id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-5"
assert provisioned_cluster.arn == provisioned_cluster_arn
assert provisioned_cluster.name == "demo-cluster-1"
assert provisioned_cluster.region == AWS_REGION_US_EAST_1
assert provisioned_cluster.tags == []
assert provisioned_cluster.state == "ACTIVE"
assert provisioned_cluster.kafka_version == "2.2.1"
assert (
kafka.clusters[cluster_arn].id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-5"
)
assert kafka.clusters[cluster_arn].arn == cluster_arn
assert kafka.clusters[cluster_arn].name == "demo-cluster-1"
assert kafka.clusters[cluster_arn].region == AWS_REGION_US_EAST_1
assert kafka.clusters[cluster_arn].tags == []
assert kafka.clusters[cluster_arn].state == "ACTIVE"
assert kafka.clusters[cluster_arn].kafka_version == "2.2.1"
assert (
kafka.clusters[cluster_arn].data_volume_kms_key_id
provisioned_cluster.data_volume_kms_key_id
== f"arn:aws:kms:{AWS_REGION_US_EAST_1}:123456789012:key/a7ca56d5-0768-4b64-a670-339a9fbef81c"
)
assert (
kafka.clusters[cluster_arn].encryption_in_transit.client_broker
== "TLS_PLAINTEXT"
provisioned_cluster.encryption_in_transit.client_broker == "TLS_PLAINTEXT"
)
assert kafka.clusters[cluster_arn].encryption_in_transit.in_cluster
assert kafka.clusters[cluster_arn].enhanced_monitoring == "DEFAULT"
assert kafka.clusters[cluster_arn].tls_authentication
assert kafka.clusters[cluster_arn].public_access
assert not kafka.clusters[cluster_arn].unauthentication_access
assert provisioned_cluster.encryption_in_transit.in_cluster
assert provisioned_cluster.enhanced_monitoring == "DEFAULT"
assert provisioned_cluster.tls_authentication
assert provisioned_cluster.public_access
assert not provisioned_cluster.unauthentication_access
# Check serverless cluster
serverless_cluster_arn = f"arn:aws:kafka:{AWS_REGION_US_EAST_1}:123456789012:cluster/serverless-cluster-1/6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert serverless_cluster_arn in kafka.clusters
serverless_cluster = kafka.clusters[serverless_cluster_arn]
assert serverless_cluster.id == "6357e0b2-0e6a-4b86-a0b4-70df934c2e31-6"
assert serverless_cluster.arn == serverless_cluster_arn
assert serverless_cluster.name == "serverless-cluster-1"
assert serverless_cluster.region == AWS_REGION_US_EAST_1
assert serverless_cluster.tags == []
assert serverless_cluster.state == "ACTIVE"
assert serverless_cluster.kafka_version == "SERVERLESS"
assert serverless_cluster.data_volume_kms_key_id == "AWS_MANAGED"
assert serverless_cluster.encryption_in_transit.client_broker == "TLS"
assert serverless_cluster.encryption_in_transit.in_cluster
assert serverless_cluster.enhanced_monitoring == "DEFAULT"
assert serverless_cluster.tls_authentication
assert not serverless_cluster.public_access
assert not serverless_cluster.unauthentication_access
# Kafka versions assertions
assert len(kafka.kafka_versions) == 2
assert kafka.kafka_versions[0].version == "1.0.0"