fix: handle eks cluster version and listener certificate arn not in acm (#8802)

This commit is contained in:
Daniel Barranquero
2025-10-01 19:55:26 +02:00
committed by GitHub
parent a555cffebe
commit e279f7fcfd
5 changed files with 195 additions and 4 deletions

View File

@@ -31,6 +31,15 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Fix SNS topics showing empty AWS_ResourceID in Quick Inventory output [(#8762)](https://github.com/prowler-cloud/prowler/issues/8762)
- Fix HTML Markdown output for long strings [(#8803)](https://github.com/prowler-cloud/prowler/pull/8803)
---
## [v5.12.4] (Prowler UNRELEASED)
### Fixed
- Fix KeyError in `elb_ssl_listeners_use_acm_certificate` check and handle None cluster version in `eks_cluster_uses_a_supported_version` check [(#8791)](https://github.com/prowler-cloud/prowler/pull/8791)
---
## [v5.12.1] (Prowler v5.12.1)
### Fixed
@@ -38,6 +47,8 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `firehose_stream_encrypted_at_rest` check false positives and new api call in kafka service [(#8599)](https://github.com/prowler-cloud/prowler/pull/8599)
- Replace defender rules policies key to use old name [(#8702)](https://github.com/prowler-cloud/prowler/pull/8702)
---
## [v5.12.0] (Prowler v5.12.0)
### Added

View File

@@ -16,6 +16,10 @@ class eks_cluster_uses_a_supported_version(Check):
for cluster in eks_client.clusters:
report = Check_Report_AWS(metadata=self.metadata(), resource=cluster)
# Handle case where cluster.version might be None (edge case during cluster creation/deletion)
if not cluster.version:
continue
cluster_version_major, cluster_version_minor = map(
int, cluster.version.split(".")
)

View File

@@ -15,8 +15,14 @@ class elb_ssl_listeners_use_acm_certificate(Check):
if (
listener.certificate_arn
and listener.protocol in secure_protocols
and acm_client.certificates[listener.certificate_arn].type
!= "AMAZON_ISSUED"
and (
listener.certificate_arn not in acm_client.certificates
or (
acm_client.certificates.get(listener.certificate_arn)
and acm_client.certificates[listener.certificate_arn].type
!= "AMAZON_ISSUED"
)
)
):
report.status = "FAIL"
report.status_extended = f"ELB {lb.name} has HTTPS/SSL listeners that are using certificates not managed by ACM."

View File

@@ -13,6 +13,7 @@ class Test_eks_cluster_ensure_version_is_supported:
def test_no_clusters(self):
eks_client = mock.MagicMock
eks_client.clusters = []
eks_client.audit_config = {"eks_cluster_oldest_version_supported": "1.28"}
with mock.patch(
"prowler.providers.aws.services.eks.eks_service.EKS",
eks_client,
@@ -53,7 +54,7 @@ class Test_eks_cluster_ensure_version_is_supported:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"EKS cluster {cluster_name} is in version 1.22. It should be one of the next supported versions: 1.28 or higher"
== f"EKS cluster {cluster_name} is using version 1.22. It should be one of the supported versions: 1.28 or higher."
)
assert result[0].resource_id == cluster_name
assert result[0].resource_arn == cluster_arn
@@ -88,7 +89,7 @@ class Test_eks_cluster_ensure_version_is_supported:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"EKS cluster {cluster_name} is in version 0.22. It should be one of the next supported versions: 1.28 or higher"
== f"EKS cluster {cluster_name} is using version 0.22. It should be one of the supported versions: 1.28 or higher."
)
assert result[0].resource_id == cluster_name
assert result[0].resource_arn == cluster_arn
@@ -199,3 +200,31 @@ class Test_eks_cluster_ensure_version_is_supported:
assert result[0].resource_arn == cluster_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_EU_WEST_1
def test_eks_cluster_with_none_version(self):
"""Test EKS cluster with version=None - should return FAIL gracefully"""
eks_client = mock.MagicMock
eks_client.audit_config = {"eks_cluster_oldest_version_supported": "1.28"}
eks_client.clusters = []
eks_client.clusters.append(
EKSCluster(
name=cluster_name,
version=None, # This should trigger the AttributeError in current implementation
arn=cluster_arn,
region=AWS_REGION_EU_WEST_1,
logging=None,
)
)
with mock.patch(
"prowler.providers.aws.services.eks.eks_service.EKS",
eks_client,
):
from prowler.providers.aws.services.eks.eks_cluster_uses_a_supported_version.eks_cluster_uses_a_supported_version import (
eks_cluster_uses_a_supported_version,
)
check = eks_cluster_uses_a_supported_version()
result = check.execute()
assert len(result) == 0

View File

@@ -364,3 +364,144 @@ class Test_elb_ssl_listeners_use_acm_certificate:
assert result[0].resource_arn == elb_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
@mock_aws
def test_elb_with_HTTPS_listener_IAM_certificate(self):
"""Test ELB with HTTPS listener using IAM certificate (not ACM) - should return FAIL"""
elb = client("elb", region_name=AWS_REGION)
ec2 = resource("ec2", region_name=AWS_REGION)
# Create IAM certificate (not ACM)
iam_certificate_arn = (
f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:server-certificate/test-certificate"
)
security_group = ec2.create_security_group(
GroupName="sg01", Description="Test security group sg01"
)
elb.create_load_balancer(
LoadBalancerName="my-lb",
Listeners=[
{
"Protocol": "https",
"LoadBalancerPort": 80,
"InstancePort": 8080,
"SSLCertificateId": iam_certificate_arn,
},
],
AvailabilityZones=[AWS_REGION_EU_WEST_1_AZA],
Scheme="internal",
SecurityGroups=[security_group.id],
)
from prowler.providers.aws.services.acm.acm_service import ACM
from prowler.providers.aws.services.elb.elb_service import ELB
aws_mocked_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_mocked_provider,
),
mock.patch(
"prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate.elb_client",
new=ELB(aws_mocked_provider),
),
mock.patch(
"prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate.acm_client",
new=ACM(aws_mocked_provider),
),
):
from prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate import (
elb_ssl_listeners_use_acm_certificate,
)
check = elb_ssl_listeners_use_acm_certificate()
# This should now work correctly and return FAIL for IAM certificate
# (unless there's still a KeyError in the current implementation)
result = check.execute()
# Expected behavior: FAIL because IAM certificate is not managed by ACM
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "ELB my-lb has HTTPS/SSL listeners that are using certificates not managed by ACM."
)
assert result[0].resource_id == "my-lb"
assert result[0].resource_arn == elb_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION
@mock_aws
def test_elb_with_HTTPS_listener_certificate_not_in_acm(self):
"""Test ELB with HTTPS listener using certificate that triggers not in acm_client.certificates condition"""
elb = client("elb", region_name=AWS_REGION)
ec2 = resource("ec2", region_name=AWS_REGION)
# Create a certificate ARN that will NOT be in ACM (simulating IAM certificate or any non-ACM certificate)
# This will trigger the first condition: listener.certificate_arn not in acm_client.certificates
non_acm_certificate_arn = (
f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:server-certificate/non-acm-cert"
)
security_group = ec2.create_security_group(
GroupName="sg01", Description="Test security group sg01"
)
elb.create_load_balancer(
LoadBalancerName="my-lb",
Listeners=[
{
"Protocol": "https",
"LoadBalancerPort": 80,
"InstancePort": 8080,
"SSLCertificateId": non_acm_certificate_arn,
},
],
AvailabilityZones=[AWS_REGION_EU_WEST_1_AZA],
Scheme="internal",
SecurityGroups=[security_group.id],
)
from prowler.providers.aws.services.acm.acm_service import ACM
from prowler.providers.aws.services.elb.elb_service import ELB
aws_mocked_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_mocked_provider,
),
mock.patch(
"prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate.elb_client",
new=ELB(aws_mocked_provider),
),
mock.patch(
"prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate.acm_client",
new=ACM(aws_mocked_provider),
),
):
from prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate import (
elb_ssl_listeners_use_acm_certificate,
)
check = elb_ssl_listeners_use_acm_certificate()
result = check.execute()
# This should trigger the first condition: listener.certificate_arn not in acm_client.certificates
# and return FAIL without ever reaching the second part of the OR condition
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "ELB my-lb has HTTPS/SSL listeners that are using certificates not managed by ACM."
)
assert result[0].resource_id == "my-lb"
assert result[0].resource_arn == elb_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION