fix(firehose): list all streams and fix firehose_stream_encrypted_at_rest logic (#8213)

This commit is contained in:
Hugo Pereira Brito
2025-07-09 09:38:54 +02:00
committed by GitHub
parent a3aef18cfe
commit ddc53c3c6d
5 changed files with 300 additions and 20 deletions

View File

@@ -21,6 +21,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
### Fixed
- fix(iam): detect wildcarded ARNs in sts:AssumeRole policy resources [(#8164)](https://github.com/prowler-cloud/prowler/pull/8164)
- list all streams and `firehose_stream_encrypted_at_rest` logic [(#8213)](https://github.com/prowler-cloud/prowler/pull/8213)
- fix(ec2): allow empty values for http_endpoint in templates [(#8184)](https://github.com/prowler-cloud/prowler/pull/8184)
---

View File

@@ -25,18 +25,47 @@ class Firehose(AWSService):
def _list_delivery_streams(self, regional_client):
logger.info("Firehose - Listing delivery streams...")
try:
for stream_name in regional_client.list_delivery_streams()[
"DeliveryStreamNames"
]:
stream_arn = f"arn:{self.audited_partition}:firehose:{regional_client.region}:{self.audited_account}:deliverystream/{stream_name}"
if not self.audit_resources or (
is_resource_filtered(stream_arn, self.audit_resources)
):
self.delivery_streams[stream_arn] = DeliveryStream(
arn=stream_arn,
name=stream_name,
region=regional_client.region,
# Manual pagination using ExclusiveStartDeliveryStreamName
# This ensures we get all streams alphabetically without duplicates
exclusive_start_delivery_stream_name = None
processed_streams = set()
while True:
kwargs = {}
if exclusive_start_delivery_stream_name:
kwargs["ExclusiveStartDeliveryStreamName"] = (
exclusive_start_delivery_stream_name
)
response = regional_client.list_delivery_streams(**kwargs)
stream_names = response.get("DeliveryStreamNames", [])
for stream_name in stream_names:
if stream_name in processed_streams:
continue
processed_streams.add(stream_name)
stream_arn = f"arn:{self.audited_partition}:firehose:{regional_client.region}:{self.audited_account}:deliverystream/{stream_name}"
if not self.audit_resources or (
is_resource_filtered(stream_arn, self.audit_resources)
):
self.delivery_streams[stream_arn] = DeliveryStream(
arn=stream_arn,
name=stream_name,
region=regional_client.region,
)
if not response.get("HasMoreDeliveryStreams", False):
break
# Set the starting point for the next page (last stream name from current batch)
# ExclusiveStartDeliveryStreamName will start after this stream alphabetically
if stream_names:
exclusive_start_delivery_stream_name = stream_names[-1]
else:
break
except ClientError as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -61,13 +90,45 @@ class Firehose(AWSService):
describe_stream = self.regional_clients[
stream.region
].describe_delivery_stream(DeliveryStreamName=stream.name)
encryption_config = describe_stream.get(
"DeliveryStreamDescription", {}
).get("DeliveryStreamEncryptionConfiguration", {})
stream.kms_encryption = EncryptionStatus(
encryption_config.get("Status", "DISABLED")
)
stream.kms_key_arn = encryption_config.get("KeyARN", "")
stream.delivery_stream_type = describe_stream.get(
"DeliveryStreamDescription", {}
).get("DeliveryStreamType", "")
source_config = describe_stream.get("DeliveryStreamDescription", {}).get(
"Source", {}
)
stream.source = Source(
direct_put=DirectPutSourceDescription(
troughput_hint_in_mb_per_sec=source_config.get(
"DirectPutSourceDescription", {}
).get("TroughputHintInMBPerSec", 0)
),
kinesis_stream=KinesisStreamSourceDescription(
kinesis_stream_arn=source_config.get(
"KinesisStreamSourceDescription", {}
).get("KinesisStreamARN", "")
),
msk=MSKSourceDescription(
msk_cluster_arn=source_config.get("MSKSourceDescription", {}).get(
"MSKClusterARN", ""
)
),
database=DatabaseSourceDescription(
endpoint=source_config.get("DatabaseSourceDescription", {}).get(
"Endpoint", ""
)
),
)
except ClientError as error:
logger.error(
f"{stream.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -85,6 +146,39 @@ class EncryptionStatus(Enum):
DISABLING_FAILED = "DISABLING_FAILED"
class DirectPutSourceDescription(BaseModel):
"""Model for the DirectPut source of a Firehose stream"""
troughput_hint_in_mb_per_sec: int = Field(default_factory=int)
class KinesisStreamSourceDescription(BaseModel):
"""Model for the KinesisStream source of a Firehose stream"""
kinesis_stream_arn: str = Field(default_factory=str)
class MSKSourceDescription(BaseModel):
"""Model for the MSK source of a Firehose stream"""
msk_cluster_arn: str = Field(default_factory=str)
class DatabaseSourceDescription(BaseModel):
"""Model for the Database source of a Firehose stream"""
endpoint: str = Field(default_factory=str)
class Source(BaseModel):
"""Model for the source of a Firehose stream"""
direct_put: Optional[DirectPutSourceDescription]
kinesis_stream: Optional[KinesisStreamSourceDescription]
msk: Optional[MSKSourceDescription]
database: Optional[DatabaseSourceDescription]
class DeliveryStream(BaseModel):
"""Model for a Firehose Delivery Stream"""
@@ -94,3 +188,5 @@ class DeliveryStream(BaseModel):
kms_key_arn: Optional[str] = Field(default_factory=str)
kms_encryption: Optional[str] = Field(default_factory=str)
tags: Optional[List[Dict[str, str]]] = Field(default_factory=list)
delivery_stream_type: Optional[str] = Field(default_factory=str)
source: Source = Field(default_factory=Source)

View File

@@ -3,6 +3,8 @@ from typing import List
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.firehose.firehose_client import firehose_client
from prowler.providers.aws.services.firehose.firehose_service import EncryptionStatus
from prowler.providers.aws.services.kinesis.kinesis_client import kinesis_client
from prowler.providers.aws.services.kinesis.kinesis_service import EncryptionType
class firehose_stream_encrypted_at_rest(Check):
@@ -22,14 +24,22 @@ class firehose_stream_encrypted_at_rest(Check):
findings = []
for stream in firehose_client.delivery_streams.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=stream)
report.status = "PASS"
report.status_extended = (
f"Firehose Stream {stream.name} does have at rest encryption enabled."
)
report.status = "FAIL"
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled or the source stream is not encrypted."
if stream.kms_encryption != EncryptionStatus.ENABLED:
report.status = "FAIL"
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled."
# Encrypted Kinesis Stream source
if stream.delivery_stream_type == "KinesisStreamAsSource":
source_stream = kinesis_client.streams.get(
stream.source.kinesis_stream.kinesis_stream_arn
)
if source_stream.encrypted_at_rest != EncryptionType.NONE:
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled but the source stream {source_stream.name} has at rest encryption enabled."
# Check if the stream has encryption enabled directly
elif stream.kms_encryption == EncryptionStatus.ENABLED:
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does have at rest encryption enabled."
findings.append(report)

View File

@@ -2,8 +2,13 @@ from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.firehose.firehose_service import (
DatabaseSourceDescription,
DirectPutSourceDescription,
EncryptionStatus,
Firehose,
KinesisStreamSourceDescription,
MSKSourceDescription,
Source,
)
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
@@ -152,3 +157,102 @@ class Test_Firehose_Service:
firehose.delivery_streams[arn].kms_key_arn
== f"arn:aws:kms:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:key/test-kms-key-id"
)
@mock_aws
def test_describe_delivery_stream_source_direct_put(self):
# Generate S3 client
s3_client = client("s3", region_name=AWS_REGION_EU_WEST_1)
s3_client.create_bucket(
Bucket="test-bucket",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
# Generate Firehose client
firehose_client = client("firehose", region_name=AWS_REGION_EU_WEST_1)
delivery_stream = firehose_client.create_delivery_stream(
DeliveryStreamName="test-delivery-stream",
DeliveryStreamType="DirectPut",
S3DestinationConfiguration={
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
"BucketARN": "arn:aws:s3:::test-bucket",
"Prefix": "",
"BufferingHints": {"IntervalInSeconds": 300, "SizeInMBs": 5},
"CompressionFormat": "UNCOMPRESSED",
},
Tags=[{"Key": "key", "Value": "value"}],
)
arn = delivery_stream["DeliveryStreamARN"]
# Firehose Client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
firehose = Firehose(aws_provider)
assert len(firehose.delivery_streams) == 1
assert firehose.delivery_streams[arn].delivery_stream_type == "DirectPut"
# Test Source structure
assert isinstance(firehose.delivery_streams[arn].source, Source)
assert isinstance(
firehose.delivery_streams[arn].source.direct_put, DirectPutSourceDescription
)
assert isinstance(
firehose.delivery_streams[arn].source.kinesis_stream,
KinesisStreamSourceDescription,
)
assert isinstance(
firehose.delivery_streams[arn].source.msk, MSKSourceDescription
)
assert isinstance(
firehose.delivery_streams[arn].source.database, DatabaseSourceDescription
)
@mock_aws
def test_describe_delivery_stream_source_kinesis_stream(self):
# Generate Kinesis client
kinesis_client = client("kinesis", region_name=AWS_REGION_EU_WEST_1)
kinesis_client.create_stream(
StreamName="test-kinesis-stream",
ShardCount=1,
)
kinesis_stream_arn = f"arn:aws:kinesis:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:stream/test-kinesis-stream"
# Generate Firehose client
firehose_client = client("firehose", region_name=AWS_REGION_EU_WEST_1)
delivery_stream = firehose_client.create_delivery_stream(
DeliveryStreamName="test-delivery-stream",
DeliveryStreamType="KinesisStreamAsSource",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": kinesis_stream_arn,
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
},
S3DestinationConfiguration={
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
"BucketARN": "arn:aws:s3:::test-bucket",
"Prefix": "",
"BufferingHints": {"IntervalInSeconds": 300, "SizeInMBs": 5},
"CompressionFormat": "UNCOMPRESSED",
},
Tags=[{"Key": "key", "Value": "value"}],
)
arn = delivery_stream["DeliveryStreamARN"]
# Firehose Client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
firehose = Firehose(aws_provider)
assert len(firehose.delivery_streams) == 1
assert (
firehose.delivery_streams[arn].delivery_stream_type
== "KinesisStreamAsSource"
)
# Test Source structure
assert isinstance(firehose.delivery_streams[arn].source, Source)
assert isinstance(
firehose.delivery_streams[arn].source.kinesis_stream,
KinesisStreamSourceDescription,
)
assert (
firehose.delivery_streams[arn].source.kinesis_stream.kinesis_stream_arn
== kinesis_stream_arn
)

View File

@@ -198,7 +198,7 @@ class Test_firehose_stream_encrypted_at_rest:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled."
== f"Firehose Stream {stream_name} does not have at rest encryption enabled or the source stream is not encrypted."
)
@mock_aws
@@ -253,5 +253,74 @@ class Test_firehose_stream_encrypted_at_rest:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled."
== f"Firehose Stream {stream_name} does not have at rest encryption enabled or the source stream is not encrypted."
)
@mock_aws
def test_stream_kinesis_source_encrypted(self):
# Generate Kinesis client
kinesis_client = client("kinesis", region_name=AWS_REGION_EU_WEST_1)
kinesis_client.create_stream(
StreamName="test-kinesis-stream",
ShardCount=1,
)
kinesis_stream_arn = f"arn:aws:kinesis:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:stream/test-kinesis-stream"
# Enable encryption on the Kinesis stream
kinesis_client.start_stream_encryption(
StreamName="test-kinesis-stream",
EncryptionType="KMS",
KeyId=f"arn:aws:kms:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:key/test-kms-key-id",
)
# Generate Firehose client
firehose_client = client("firehose", region_name=AWS_REGION_EU_WEST_1)
delivery_stream = firehose_client.create_delivery_stream(
DeliveryStreamName="test-delivery-stream",
DeliveryStreamType="KinesisStreamAsSource",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": kinesis_stream_arn,
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
},
S3DestinationConfiguration={
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
"BucketARN": "arn:aws:s3:::test-bucket",
"Prefix": "",
"BufferingHints": {"IntervalInSeconds": 300, "SizeInMBs": 5},
"CompressionFormat": "UNCOMPRESSED",
},
Tags=[{"Key": "key", "Value": "value"}],
)
arn = delivery_stream["DeliveryStreamARN"]
stream_name = arn.split("/")[-1]
from prowler.providers.aws.services.firehose.firehose_service import Firehose
from prowler.providers.aws.services.kinesis.kinesis_service import Kinesis
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest.firehose_client",
new=Firehose(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest.kinesis_client",
new=Kinesis(aws_provider),
):
# Test Check
from prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest import (
firehose_stream_encrypted_at_rest,
)
check = firehose_stream_encrypted_at_rest()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled but the source stream test-kinesis-stream has at rest encryption enabled."
)