From e279f7fcfd73767e8e47db342f0bbcb4689644ee Mon Sep 17 00:00:00 2001 From: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com> Date: Wed, 1 Oct 2025 19:55:26 +0200 Subject: [PATCH] fix: handle eks cluster version and listener certificate arn not in acm (#8802) --- prowler/CHANGELOG.md | 11 ++ .../eks_cluster_uses_a_supported_version.py | 4 + .../elb_ssl_listeners_use_acm_certificate.py | 10 +- .../eks_cluster_uses_a_supported_version.py | 33 +++- ..._ssl_listeners_use_acm_certificate_test.py | 141 ++++++++++++++++++ 5 files changed, 195 insertions(+), 4 deletions(-) diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index b1446c5a48..6ba03f03c9 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -31,6 +31,15 @@ All notable changes to the **Prowler SDK** are documented in this file. - Fix SNS topics showing empty AWS_ResourceID in Quick Inventory output [(#8762)](https://github.com/prowler-cloud/prowler/issues/8762) - Fix HTML Markdown output for long strings [(#8803)](https://github.com/prowler-cloud/prowler/pull/8803) +--- + +## [v5.12.4] (Prowler UNRELEASED) + +### Fixed +- Fix KeyError in `elb_ssl_listeners_use_acm_certificate` check and handle None cluster version in `eks_cluster_uses_a_supported_version` check [(#8791)](https://github.com/prowler-cloud/prowler/pull/8791) + +--- + ## [v5.12.1] (Prowler v5.12.1) ### Fixed @@ -38,6 +47,8 @@ All notable changes to the **Prowler SDK** are documented in this file. - `firehose_stream_encrypted_at_rest` check false positives and new api call in kafka service [(#8599)](https://github.com/prowler-cloud/prowler/pull/8599) - Replace defender rules policies key to use old name [(#8702)](https://github.com/prowler-cloud/prowler/pull/8702) +--- + ## [v5.12.0] (Prowler v5.12.0) ### Added diff --git a/prowler/providers/aws/services/eks/eks_cluster_uses_a_supported_version/eks_cluster_uses_a_supported_version.py b/prowler/providers/aws/services/eks/eks_cluster_uses_a_supported_version/eks_cluster_uses_a_supported_version.py index 8d98f2950e..a2669994be 100644 --- a/prowler/providers/aws/services/eks/eks_cluster_uses_a_supported_version/eks_cluster_uses_a_supported_version.py +++ b/prowler/providers/aws/services/eks/eks_cluster_uses_a_supported_version/eks_cluster_uses_a_supported_version.py @@ -16,6 +16,10 @@ class eks_cluster_uses_a_supported_version(Check): for cluster in eks_client.clusters: report = Check_Report_AWS(metadata=self.metadata(), resource=cluster) + # Handle case where cluster.version might be None (edge case during cluster creation/deletion) + if not cluster.version: + continue + cluster_version_major, cluster_version_minor = map( int, cluster.version.split(".") ) diff --git a/prowler/providers/aws/services/elb/elb_ssl_listeners_use_acm_certificate/elb_ssl_listeners_use_acm_certificate.py b/prowler/providers/aws/services/elb/elb_ssl_listeners_use_acm_certificate/elb_ssl_listeners_use_acm_certificate.py index fef41a94ed..f4bb9eae5a 100644 --- a/prowler/providers/aws/services/elb/elb_ssl_listeners_use_acm_certificate/elb_ssl_listeners_use_acm_certificate.py +++ b/prowler/providers/aws/services/elb/elb_ssl_listeners_use_acm_certificate/elb_ssl_listeners_use_acm_certificate.py @@ -15,8 +15,14 @@ class elb_ssl_listeners_use_acm_certificate(Check): if ( listener.certificate_arn and listener.protocol in secure_protocols - and acm_client.certificates[listener.certificate_arn].type - != "AMAZON_ISSUED" + and ( + listener.certificate_arn not in acm_client.certificates + or ( + acm_client.certificates.get(listener.certificate_arn) + and acm_client.certificates[listener.certificate_arn].type + != "AMAZON_ISSUED" + ) + ) ): report.status = "FAIL" report.status_extended = f"ELB {lb.name} has HTTPS/SSL listeners that are using certificates not managed by ACM." diff --git a/tests/providers/aws/services/eks/eks_cluster_uses_a_supported_version/eks_cluster_uses_a_supported_version.py b/tests/providers/aws/services/eks/eks_cluster_uses_a_supported_version/eks_cluster_uses_a_supported_version.py index 425a4e9332..0868eeba9e 100644 --- a/tests/providers/aws/services/eks/eks_cluster_uses_a_supported_version/eks_cluster_uses_a_supported_version.py +++ b/tests/providers/aws/services/eks/eks_cluster_uses_a_supported_version/eks_cluster_uses_a_supported_version.py @@ -13,6 +13,7 @@ class Test_eks_cluster_ensure_version_is_supported: def test_no_clusters(self): eks_client = mock.MagicMock eks_client.clusters = [] + eks_client.audit_config = {"eks_cluster_oldest_version_supported": "1.28"} with mock.patch( "prowler.providers.aws.services.eks.eks_service.EKS", eks_client, @@ -53,7 +54,7 @@ class Test_eks_cluster_ensure_version_is_supported: assert result[0].status == "FAIL" assert ( result[0].status_extended - == f"EKS cluster {cluster_name} is in version 1.22. It should be one of the next supported versions: 1.28 or higher" + == f"EKS cluster {cluster_name} is using version 1.22. It should be one of the supported versions: 1.28 or higher." ) assert result[0].resource_id == cluster_name assert result[0].resource_arn == cluster_arn @@ -88,7 +89,7 @@ class Test_eks_cluster_ensure_version_is_supported: assert result[0].status == "FAIL" assert ( result[0].status_extended - == f"EKS cluster {cluster_name} is in version 0.22. It should be one of the next supported versions: 1.28 or higher" + == f"EKS cluster {cluster_name} is using version 0.22. It should be one of the supported versions: 1.28 or higher." ) assert result[0].resource_id == cluster_name assert result[0].resource_arn == cluster_arn @@ -199,3 +200,31 @@ class Test_eks_cluster_ensure_version_is_supported: assert result[0].resource_arn == cluster_arn assert result[0].resource_tags == [] assert result[0].region == AWS_REGION_EU_WEST_1 + + def test_eks_cluster_with_none_version(self): + """Test EKS cluster with version=None - should return FAIL gracefully""" + eks_client = mock.MagicMock + eks_client.audit_config = {"eks_cluster_oldest_version_supported": "1.28"} + eks_client.clusters = [] + eks_client.clusters.append( + EKSCluster( + name=cluster_name, + version=None, # This should trigger the AttributeError in current implementation + arn=cluster_arn, + region=AWS_REGION_EU_WEST_1, + logging=None, + ) + ) + + with mock.patch( + "prowler.providers.aws.services.eks.eks_service.EKS", + eks_client, + ): + from prowler.providers.aws.services.eks.eks_cluster_uses_a_supported_version.eks_cluster_uses_a_supported_version import ( + eks_cluster_uses_a_supported_version, + ) + + check = eks_cluster_uses_a_supported_version() + result = check.execute() + + assert len(result) == 0 diff --git a/tests/providers/aws/services/elb/elb_ssl_listeners_use_acm_certificate/elb_ssl_listeners_use_acm_certificate_test.py b/tests/providers/aws/services/elb/elb_ssl_listeners_use_acm_certificate/elb_ssl_listeners_use_acm_certificate_test.py index 1fef4bd613..0dd909d865 100644 --- a/tests/providers/aws/services/elb/elb_ssl_listeners_use_acm_certificate/elb_ssl_listeners_use_acm_certificate_test.py +++ b/tests/providers/aws/services/elb/elb_ssl_listeners_use_acm_certificate/elb_ssl_listeners_use_acm_certificate_test.py @@ -364,3 +364,144 @@ class Test_elb_ssl_listeners_use_acm_certificate: assert result[0].resource_arn == elb_arn assert result[0].resource_tags == [] assert result[0].region == AWS_REGION + + @mock_aws + def test_elb_with_HTTPS_listener_IAM_certificate(self): + """Test ELB with HTTPS listener using IAM certificate (not ACM) - should return FAIL""" + elb = client("elb", region_name=AWS_REGION) + ec2 = resource("ec2", region_name=AWS_REGION) + + # Create IAM certificate (not ACM) + iam_certificate_arn = ( + f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:server-certificate/test-certificate" + ) + + security_group = ec2.create_security_group( + GroupName="sg01", Description="Test security group sg01" + ) + + elb.create_load_balancer( + LoadBalancerName="my-lb", + Listeners=[ + { + "Protocol": "https", + "LoadBalancerPort": 80, + "InstancePort": 8080, + "SSLCertificateId": iam_certificate_arn, + }, + ], + AvailabilityZones=[AWS_REGION_EU_WEST_1_AZA], + Scheme="internal", + SecurityGroups=[security_group.id], + ) + + from prowler.providers.aws.services.acm.acm_service import ACM + from prowler.providers.aws.services.elb.elb_service import ELB + + aws_mocked_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_mocked_provider, + ), + mock.patch( + "prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate.elb_client", + new=ELB(aws_mocked_provider), + ), + mock.patch( + "prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate.acm_client", + new=ACM(aws_mocked_provider), + ), + ): + from prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate import ( + elb_ssl_listeners_use_acm_certificate, + ) + + check = elb_ssl_listeners_use_acm_certificate() + + # This should now work correctly and return FAIL for IAM certificate + # (unless there's still a KeyError in the current implementation) + result = check.execute() + + # Expected behavior: FAIL because IAM certificate is not managed by ACM + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "ELB my-lb has HTTPS/SSL listeners that are using certificates not managed by ACM." + ) + assert result[0].resource_id == "my-lb" + assert result[0].resource_arn == elb_arn + assert result[0].resource_tags == [] + assert result[0].region == AWS_REGION + + @mock_aws + def test_elb_with_HTTPS_listener_certificate_not_in_acm(self): + """Test ELB with HTTPS listener using certificate that triggers not in acm_client.certificates condition""" + elb = client("elb", region_name=AWS_REGION) + ec2 = resource("ec2", region_name=AWS_REGION) + + # Create a certificate ARN that will NOT be in ACM (simulating IAM certificate or any non-ACM certificate) + # This will trigger the first condition: listener.certificate_arn not in acm_client.certificates + non_acm_certificate_arn = ( + f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:server-certificate/non-acm-cert" + ) + + security_group = ec2.create_security_group( + GroupName="sg01", Description="Test security group sg01" + ) + + elb.create_load_balancer( + LoadBalancerName="my-lb", + Listeners=[ + { + "Protocol": "https", + "LoadBalancerPort": 80, + "InstancePort": 8080, + "SSLCertificateId": non_acm_certificate_arn, + }, + ], + AvailabilityZones=[AWS_REGION_EU_WEST_1_AZA], + Scheme="internal", + SecurityGroups=[security_group.id], + ) + + from prowler.providers.aws.services.acm.acm_service import ACM + from prowler.providers.aws.services.elb.elb_service import ELB + + aws_mocked_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_mocked_provider, + ), + mock.patch( + "prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate.elb_client", + new=ELB(aws_mocked_provider), + ), + mock.patch( + "prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate.acm_client", + new=ACM(aws_mocked_provider), + ), + ): + from prowler.providers.aws.services.elb.elb_ssl_listeners_use_acm_certificate.elb_ssl_listeners_use_acm_certificate import ( + elb_ssl_listeners_use_acm_certificate, + ) + + check = elb_ssl_listeners_use_acm_certificate() + result = check.execute() + + # This should trigger the first condition: listener.certificate_arn not in acm_client.certificates + # and return FAIL without ever reaching the second part of the OR condition + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "ELB my-lb has HTTPS/SSL listeners that are using certificates not managed by ACM." + ) + assert result[0].resource_id == "my-lb" + assert result[0].resource_arn == elb_arn + assert result[0].resource_tags == [] + assert result[0].region == AWS_REGION