fix(cloudfront): fix false positive in s3 origins (#6823)

(cherry picked from commit 914012de2b)

# Conflicts:
#	prowler/providers/aws/services/cloudfront/cloudfront_distributions_origin_traffic_encrypted/cloudfront_distributions_origin_traffic_encrypted.py
#	tests/providers/aws/services/cloudfront/cloudfront_distributions_origin_traffic_encrypted/cloudfront_distributions_origin_traffic_encrypted_test.py
This commit is contained in:
Daniel Barranquero
2025-02-05 18:39:49 +01:00
parent 5aee8b7880
commit c7f6a1c78f
2 changed files with 316 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.cloudfront.cloudfront_client import (
cloudfront_client,
)
class cloudfront_distributions_origin_traffic_encrypted(Check):
def execute(self):
findings = []
for distribution in cloudfront_client.distributions.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=distribution)
report.status = "PASS"
report.status_extended = f"CloudFront Distribution {distribution.id} does encrypt traffic to custom origins."
unencrypted_origins = []
for origin in distribution.origins:
if origin.s3_origin_config:
# For S3, only check the viewer protocol policy
if distribution.viewer_protocol_policy == "allow-all":
unencrypted_origins.append(origin.id)
else:
# Regular check for custom origins (ALB, EC2, API Gateway, etc.)
if (
origin.origin_protocol_policy == ""
or origin.origin_protocol_policy == "http-only"
) or (
origin.origin_protocol_policy == "match-viewer"
and distribution.viewer_protocol_policy == "allow-all"
):
unencrypted_origins.append(origin.id)
if unencrypted_origins:
report.status = "FAIL"
report.status_extended = f"CloudFront Distribution {distribution.id} does not encrypt traffic to custom origins {', '.join(unencrypted_origins)}."
findings.append(report)
return findings

View File

@@ -0,0 +1,278 @@
from unittest import mock
from prowler.providers.aws.services.cloudfront.cloudfront_service import (
DefaultCacheConfigBehaviour,
Distribution,
Origin,
ViewerProtocolPolicy,
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER
DISTRIBUTION_ID = "E27LVI50CSW06W"
DISTRIBUTION_ARN = (
f"arn:aws:cloudfront::{AWS_ACCOUNT_NUMBER}:distribution/{DISTRIBUTION_ID}"
)
REGION = "eu-west-1"
class Test_cloudfront_distributions_origin_traffic_encrypted:
def test_no_distributions(self):
cloudfront_client = mock.MagicMock
cloudfront_client.distributions = {}
with mock.patch(
"prowler.providers.aws.services.cloudfront.cloudfront_service.CloudFront",
new=cloudfront_client,
):
# Test Check
from prowler.providers.aws.services.cloudfront.cloudfront_distributions_origin_traffic_encrypted.cloudfront_distributions_origin_traffic_encrypted import (
cloudfront_distributions_origin_traffic_encrypted,
)
check = cloudfront_distributions_origin_traffic_encrypted()
result = check.execute()
assert len(result) == 0
def test_distribution_no_traffic_encryption(self):
cloudfront_client = mock.MagicMock
id = "origin1"
cloudfront_client.distributions = {
DISTRIBUTION_ID: Distribution(
arn=DISTRIBUTION_ARN,
id=DISTRIBUTION_ID,
region=REGION,
origins=[
Origin(
id=id,
domain_name="asdf.s3.us-east-1.amazonaws.com",
origin_protocol_policy="",
origin_ssl_protocols=[],
)
],
default_cache_config=DefaultCacheConfigBehaviour(
realtime_log_config_arn="",
viewer_protocol_policy=ViewerProtocolPolicy.allow_all,
field_level_encryption_id="",
),
default_root_object="",
viewer_protocol_policy="",
)
}
with mock.patch(
"prowler.providers.aws.services.cloudfront.cloudfront_service.CloudFront",
new=cloudfront_client,
):
# Test Check
from prowler.providers.aws.services.cloudfront.cloudfront_distributions_origin_traffic_encrypted.cloudfront_distributions_origin_traffic_encrypted import (
cloudfront_distributions_origin_traffic_encrypted,
)
check = cloudfront_distributions_origin_traffic_encrypted()
result = check.execute()
assert len(result) == 1
assert result[0].region == REGION
assert result[0].resource_arn == DISTRIBUTION_ARN
assert result[0].resource_id == DISTRIBUTION_ID
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} does not encrypt traffic to custom origins {id}."
)
assert result[0].resource_tags == []
def test_distribution_http_only(self):
cloudfront_client = mock.MagicMock
id = "origin1"
cloudfront_client.distributions = {
DISTRIBUTION_ID: Distribution(
arn=DISTRIBUTION_ARN,
id=DISTRIBUTION_ID,
region=REGION,
origins=[
Origin(
id=id,
domain_name="asdf.s3.us-east-1.amazonaws.com",
origin_protocol_policy="http-only",
origin_ssl_protocols=[],
)
],
default_cache_config=DefaultCacheConfigBehaviour(
realtime_log_config_arn="",
viewer_protocol_policy=ViewerProtocolPolicy.allow_all,
field_level_encryption_id="",
),
default_root_object="index.html",
)
}
with mock.patch(
"prowler.providers.aws.services.cloudfront.cloudfront_service.CloudFront",
new=cloudfront_client,
):
# Test Check
from prowler.providers.aws.services.cloudfront.cloudfront_distributions_origin_traffic_encrypted.cloudfront_distributions_origin_traffic_encrypted import (
cloudfront_distributions_origin_traffic_encrypted,
)
check = cloudfront_distributions_origin_traffic_encrypted()
result = check.execute()
assert len(result) == 1
assert result[0].region == REGION
assert result[0].resource_arn == DISTRIBUTION_ARN
assert result[0].resource_id == DISTRIBUTION_ID
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} does not encrypt traffic to custom origins {id}."
)
assert result[0].resource_tags == []
def test_distribution_match_viewer_allow_all(self):
cloudfront_client = mock.MagicMock
id = "origin1"
cloudfront_client.distributions = {
DISTRIBUTION_ID: Distribution(
arn=DISTRIBUTION_ARN,
id=DISTRIBUTION_ID,
region=REGION,
origins=[
Origin(
id=id,
domain_name="asdf.s3.us-east-1.amazonaws.com",
origin_protocol_policy="match-viewer",
origin_ssl_protocols=[],
)
],
default_cache_config=DefaultCacheConfigBehaviour(
realtime_log_config_arn="",
viewer_protocol_policy=ViewerProtocolPolicy.allow_all,
field_level_encryption_id="",
),
default_root_object="index.html",
viewer_protocol_policy="allow-all",
)
}
with mock.patch(
"prowler.providers.aws.services.cloudfront.cloudfront_service.CloudFront",
new=cloudfront_client,
):
# Test Check
from prowler.providers.aws.services.cloudfront.cloudfront_distributions_origin_traffic_encrypted.cloudfront_distributions_origin_traffic_encrypted import (
cloudfront_distributions_origin_traffic_encrypted,
)
check = cloudfront_distributions_origin_traffic_encrypted()
result = check.execute()
assert len(result) == 1
assert result[0].region == REGION
assert result[0].resource_arn == DISTRIBUTION_ARN
assert result[0].resource_id == DISTRIBUTION_ID
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} does not encrypt traffic to custom origins {id}."
)
assert result[0].resource_tags == []
def test_distribution_traffic_encrypted(self):
cloudfront_client = mock.MagicMock
cloudfront_client.distributions = {
DISTRIBUTION_ID: Distribution(
arn=DISTRIBUTION_ARN,
id=DISTRIBUTION_ID,
region=REGION,
origins=[
Origin(
id="origin1",
domain_name="asdf.s3.us-east-1.amazonaws.com",
origin_protocol_policy="https-only",
origin_ssl_protocols=[],
)
],
default_cache_config=DefaultCacheConfigBehaviour(
realtime_log_config_arn="",
viewer_protocol_policy=ViewerProtocolPolicy.allow_all,
field_level_encryption_id="",
),
default_root_object="index.html",
)
}
with mock.patch(
"prowler.providers.aws.services.cloudfront.cloudfront_service.CloudFront",
new=cloudfront_client,
):
# Test Check
from prowler.providers.aws.services.cloudfront.cloudfront_distributions_origin_traffic_encrypted.cloudfront_distributions_origin_traffic_encrypted import (
cloudfront_distributions_origin_traffic_encrypted,
)
check = cloudfront_distributions_origin_traffic_encrypted()
result = check.execute()
assert len(result) == 1
assert result[0].region == REGION
assert result[0].resource_arn == DISTRIBUTION_ARN
assert result[0].resource_id == DISTRIBUTION_ID
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} does encrypt traffic to custom origins."
)
assert result[0].resource_tags == []
def test_distribution_traffic_encrypted_with_s3_config(self):
cloudfront_client = mock.MagicMock
cloudfront_client.distributions = {
DISTRIBUTION_ID: Distribution(
arn=DISTRIBUTION_ARN,
id=DISTRIBUTION_ID,
region=REGION,
origins=[
Origin(
id="origin1",
domain_name="asdf.s3.us-east-1.amazonaws.com",
origin_protocol_policy="",
origin_ssl_protocols=[],
s3_origin_config={
"OriginAccessIdentity": "origin-access-identity/cloudfront/1234567890123456"
},
)
],
default_cache_config=DefaultCacheConfigBehaviour(
realtime_log_config_arn="",
viewer_protocol_policy=ViewerProtocolPolicy.redirect_to_https,
field_level_encryption_id="",
),
default_root_object="index.html",
viewer_protocol_policy="redirect-to-https",
)
}
with mock.patch(
"prowler.providers.aws.services.cloudfront.cloudfront_service.CloudFront",
new=cloudfront_client,
):
# Test Check
from prowler.providers.aws.services.cloudfront.cloudfront_distributions_origin_traffic_encrypted.cloudfront_distributions_origin_traffic_encrypted import (
cloudfront_distributions_origin_traffic_encrypted,
)
check = cloudfront_distributions_origin_traffic_encrypted()
result = check.execute()
assert len(result) == 1
assert result[0].region == REGION
assert result[0].resource_arn == DISTRIBUTION_ARN
assert result[0].resource_id == DISTRIBUTION_ID
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"CloudFront Distribution {DISTRIBUTION_ID} does encrypt traffic to custom origins."
)
assert result[0].resource_tags == []