fix(aws): firehose_stream_encrypted_at_rest description and logic (#9142)

This commit is contained in:
Hugo Pereira Brito
2025-11-03 17:31:18 +01:00
committed by GitHub
parent 8b0b9cad32
commit 61a66f2bbf
4 changed files with 159 additions and 22 deletions

View File

@@ -43,6 +43,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Fix `ec2_instance_with_outdated_ami` check to handle None AMIs [(#9046)](https://github.com/prowler-cloud/prowler/pull/9046)
- Handle timestamp when transforming compliance findings in CCC [(#9042)](https://github.com/prowler-cloud/prowler/pull/9042)
- Update `resource_id` for admincenter service and avoid unnecessary msgraph requests [(#9019)](https://github.com/prowler-cloud/prowler/pull/9019)
- Fix `firehose_stream_encrypted_at_rest` description and findings clarity [(#9142)](https://github.com/prowler-cloud/prowler/pull/9142)
---

View File

@@ -13,7 +13,7 @@
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AwsKinesisStream",
"Description": "**Amazon Data Firehose** delivery streams use **server-side encryption at rest** with AWS KMS. For `DirectPut` or database sources, the stream must have KMS encryption enabled. When the source is **Kinesis Data Streams**, the source's encryption is considered. For **MSK** sources, encryption at rest is always applied by AWS.",
"Description": "**Amazon Data Firehose** delivery streams must enable **server-side encryption at rest** with AWS KMS regardless of the source type. Encryption of upstream sources such as **Kinesis Data Streams** or **MSK** does not replace the need to protect the delivery stream itself.",
"Risk": "Unencrypted Firehose data at rest can be read if storage or backups are accessed, harming **confidentiality** and **integrity**. Disk-level access, snapshots, or misconfigured destinations enable data exfiltration or tampering. Lacking KMS-backed controls also reduces key rotation, segregation of duties, and auditability.",
"RelatedUrl": "",
"AdditionalURLs": [

View File

@@ -26,23 +26,27 @@ class firehose_stream_encrypted_at_rest(Check):
for stream in firehose_client.delivery_streams.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=stream)
report.status = "FAIL"
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled or the source stream is not encrypted."
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled."
# Encrypted Kinesis Stream source
if stream.delivery_stream_type == "KinesisStreamAsSource":
source_stream = kinesis_client.streams.get(
stream.source.kinesis_stream.kinesis_stream_arn
)
if stream.kms_encryption == EncryptionStatus.ENABLED:
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does have at rest encryption enabled."
elif stream.delivery_stream_type == "KinesisStreamAsSource":
source_stream_arn = stream.source.kinesis_stream.kinesis_stream_arn
source_stream = kinesis_client.streams.get(source_stream_arn, None)
if source_stream:
if source_stream.encrypted_at_rest != EncryptionType.NONE:
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled but the source stream {source_stream.name} has at rest encryption enabled."
if source_stream.encrypted_at_rest == EncryptionType.KMS:
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled even though source stream {source_stream.name} has at rest encryption enabled."
else:
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled and the source stream {source_stream.name} is not encrypted at rest."
else:
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled and the referenced source stream could not be found."
# MSK source - check if the MSK cluster has encryption at rest with CMK
elif stream.delivery_stream_type == "MSKAsSource":
msk_cluster_arn = stream.source.msk.msk_cluster_arn
msk_cluster = None
if msk_cluster_arn:
msk_cluster = None
for cluster in kafka_client.clusters.values():
if cluster.arn == msk_cluster_arn:
msk_cluster = cluster
@@ -59,11 +63,6 @@ class firehose_stream_encrypted_at_rest(Check):
else:
report.status_extended = f"Firehose Stream {stream.name} uses MSK source which always has encryption at rest enabled by AWS."
# Check if the stream has encryption enabled directly (DirectPut or DatabaseAsSource cases)
elif stream.kms_encryption == EncryptionStatus.ENABLED:
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does have at rest encryption enabled."
findings.append(report)
return findings

View File

@@ -198,7 +198,7 @@ class Test_firehose_stream_encrypted_at_rest:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled or the source stream is not encrypted."
== f"Firehose Stream {stream_name} does not have at rest encryption enabled."
)
@mock_aws
@@ -253,7 +253,7 @@ class Test_firehose_stream_encrypted_at_rest:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled or the source stream is not encrypted."
== f"Firehose Stream {stream_name} does not have at rest encryption enabled."
)
@mock_aws
@@ -319,12 +319,149 @@ class Test_firehose_stream_encrypted_at_rest:
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled but the source stream test-kinesis-stream has at rest encryption enabled."
== f"Firehose Stream {stream_name} does not have at rest encryption enabled even though source stream test-kinesis-stream has at rest encryption enabled."
)
@mock_aws
def test_stream_kinesis_source_encrypted_firehose_encryption_enabled(self):
# Generate Kinesis client
kinesis_client = client("kinesis", region_name=AWS_REGION_EU_WEST_1)
kinesis_client.create_stream(
StreamName="test-kinesis-stream",
ShardCount=1,
)
kinesis_stream_arn = f"arn:aws:kinesis:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:stream/test-kinesis-stream"
# Enable encryption on the Kinesis stream
kinesis_client.start_stream_encryption(
StreamName="test-kinesis-stream",
EncryptionType="KMS",
KeyId=f"arn:aws:kms:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:key/test-kms-key-id",
)
# Generate Firehose client
firehose = client("firehose", region_name=AWS_REGION_EU_WEST_1)
delivery_stream = firehose.create_delivery_stream(
DeliveryStreamName="test-delivery-stream",
DeliveryStreamType="KinesisStreamAsSource",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": kinesis_stream_arn,
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
},
S3DestinationConfiguration={
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
"BucketARN": "arn:aws:s3:::test-bucket",
"Prefix": "",
"BufferingHints": {"IntervalInSeconds": 300, "SizeInMBs": 5},
"CompressionFormat": "UNCOMPRESSED",
},
)
stream_name = delivery_stream["DeliveryStreamARN"].split("/")[-1]
firehose.start_delivery_stream_encryption(
DeliveryStreamName=stream_name,
DeliveryStreamEncryptionConfigurationInput={
"KeyType": "AWS_OWNED_CMK",
},
)
from prowler.providers.aws.services.firehose.firehose_service import Firehose
from prowler.providers.aws.services.kinesis.kinesis_service import Kinesis
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest.firehose_client",
new=Firehose(aws_provider),
),
mock.patch(
"prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest.kinesis_client",
new=Kinesis(aws_provider),
),
):
from prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest import (
firehose_stream_encrypted_at_rest,
)
check = firehose_stream_encrypted_at_rest()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does have at rest encryption enabled."
)
@mock_aws
def test_stream_kinesis_source_not_encrypted(self):
# Generate Kinesis client
kinesis_client = client("kinesis", region_name=AWS_REGION_EU_WEST_1)
kinesis_client.create_stream(
StreamName="test-kinesis-stream",
ShardCount=1,
)
kinesis_stream_arn = f"arn:aws:kinesis:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:stream/test-kinesis-stream"
# Generate Firehose client
firehose_client = client("firehose", region_name=AWS_REGION_EU_WEST_1)
delivery_stream = firehose_client.create_delivery_stream(
DeliveryStreamName="test-delivery-stream",
DeliveryStreamType="KinesisStreamAsSource",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": kinesis_stream_arn,
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
},
S3DestinationConfiguration={
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
"BucketARN": "arn:aws:s3:::test-bucket",
"Prefix": "",
"BufferingHints": {"IntervalInSeconds": 300, "SizeInMBs": 5},
"CompressionFormat": "UNCOMPRESSED",
},
)
stream_name = delivery_stream["DeliveryStreamARN"].split("/")[-1]
from prowler.providers.aws.services.firehose.firehose_service import Firehose
from prowler.providers.aws.services.kinesis.kinesis_service import Kinesis
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest.firehose_client",
new=Firehose(aws_provider),
),
mock.patch(
"prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest.kinesis_client",
new=Kinesis(aws_provider),
),
):
from prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest import (
firehose_stream_encrypted_at_rest,
)
check = firehose_stream_encrypted_at_rest()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled and the source stream test-kinesis-stream is not encrypted at rest."
)
@mock_aws
def test_stream_kinesis_source_not_found(self):
"""Test case when Kinesis source stream is not found - should handle None gracefully"""
@@ -377,5 +514,5 @@ class Test_firehose_stream_encrypted_at_rest:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled or the source stream is not encrypted."
== f"Firehose Stream {stream_name} does not have at rest encryption enabled and the referenced source stream could not be found."
)