feat(aws): new check eks_cluster_deletion_protection_enabled (#8536)

This commit is contained in:
Sergio Garcia
2025-08-19 10:25:24 +02:00
committed by GitHub
parent efdeb431ba
commit 30518f2e0e
6 changed files with 181 additions and 0 deletions
+1
View File
@@ -10,6 +10,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `vm_jit_access_enabled` check for Azure provider [(#8202)](https://github.com/prowler-cloud/prowler/pull/8202)
- Bedrock AgentCore privilege escalation combination for AWS provider [(#8526)](https://github.com/prowler-cloud/prowler/pull/8526)
- Remove standalone iam:PassRole from privesc detection and add missing patterns [(#8530)](https://github.com/prowler-cloud/prowler/pull/8530)
- `eks_cluster_deletion_protection_enabled` check for AWS provider [(#8536)](https://github.com/prowler-cloud/prowler/pull/8536)
### Changed
- Refine kisa isms-p compliance mapping [(#8479)](https://github.com/prowler-cloud/prowler/pull/8479)
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "eks_cluster_deletion_protection_enabled",
"CheckTitle": "Ensure EKS clusters have deletion protection enabled",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices/Resource Management"
],
"ServiceName": "eks",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "high",
"ResourceType": "AwsEksCluster",
"Description": "Ensure that your Amazon EKS clusters have deletion protection enabled to prevent accidental deletion of critical Kubernetes clusters.",
"Risk": "Without deletion protection, EKS clusters can be accidentally deleted through Terraform automation, AWS CLI commands, or the AWS console, leading to data loss and service disruption.",
"RelatedUrl": "https://docs.aws.amazon.com/eks/latest/userguide/deletion-protection.html",
"Remediation": {
"Code": {
"CLI": "aws eks update-cluster-config --region <region_name> --name <cluster_name> --deletion-protection",
"NativeIaC": "",
"Other": "",
"Terraform": "resource \"aws_eks_cluster\" \"example\" {\n name = \"example-cluster\"\n role_arn = aws_iam_role.example.arn\n deletion_protection = true\n # ... other configuration\n}"
},
"Recommendation": {
"Text": "Enable deletion protection on all EKS clusters to prevent accidental deletion. This is especially important for production clusters and those managed through Infrastructure as Code (IaC) tools.",
"Url": "https://docs.aws.amazon.com/eks/latest/userguide/deletion-protection.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,21 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.eks.eks_client import eks_client
class eks_cluster_deletion_protection_enabled(Check):
def execute(self):
findings = []
for cluster in eks_client.clusters:
report = Check_Report_AWS(metadata=self.metadata(), resource=cluster)
report.status = "PASS"
report.status_extended = (
f"EKS cluster {cluster.name} has deletion protection enabled."
)
if cluster.deletion_protection is False:
report.status = "FAIL"
report.status_extended = (
f"EKS cluster {cluster.name} has deletion protection disabled."
)
findings.append(report)
return findings
@@ -83,6 +83,10 @@ class EKS(AWSService):
]["publicAccessCidrs"]
if "encryptionConfig" in describe_cluster["cluster"]:
cluster.encryptionConfig = True
if "deletionProtection" in describe_cluster["cluster"]:
cluster.deletion_protection = describe_cluster["cluster"][
"deletionProtection"
]
cluster.tags = [describe_cluster["cluster"].get("tags")]
cluster.version = describe_cluster["cluster"].get("version", "")
@@ -108,4 +112,5 @@ class EKSCluster(BaseModel):
endpoint_private_access: bool = None
public_access_cidrs: list[str] = []
encryptionConfig: bool = None
deletion_protection: bool = None
tags: Optional[list] = []
@@ -0,0 +1,122 @@
from unittest import mock
from prowler.providers.aws.services.eks.eks_service import EKSCluster
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
cluster_name = "cluster_test"
cluster_arn = (
f"arn:aws:eks:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:cluster/{cluster_name}"
)
class Test_eks_cluster_deletion_protection_enabled:
def test_no_clusters(self):
eks_client = mock.MagicMock
eks_client.clusters = []
with mock.patch(
"prowler.providers.aws.services.eks.eks_service.EKS",
eks_client,
):
from prowler.providers.aws.services.eks.eks_cluster_deletion_protection_enabled.eks_cluster_deletion_protection_enabled import (
eks_cluster_deletion_protection_enabled,
)
check = eks_cluster_deletion_protection_enabled()
result = check.execute()
assert len(result) == 0
def test_cluster_deletion_protection_disabled(self):
eks_client = mock.MagicMock
eks_client.clusters = []
eks_client.clusters.append(
EKSCluster(
name=cluster_name,
arn=cluster_arn,
region=AWS_REGION_EU_WEST_1,
deletion_protection=False,
)
)
with mock.patch(
"prowler.providers.aws.services.eks.eks_service.EKS",
eks_client,
):
from prowler.providers.aws.services.eks.eks_cluster_deletion_protection_enabled.eks_cluster_deletion_protection_enabled import (
eks_cluster_deletion_protection_enabled,
)
check = eks_cluster_deletion_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
f"EKS cluster {cluster_name} has deletion protection disabled."
)
assert result[0].resource_id == cluster_name
assert result[0].resource_arn == cluster_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_EU_WEST_1
def test_cluster_deletion_protection_enabled(self):
eks_client = mock.MagicMock
eks_client.clusters = []
eks_client.clusters.append(
EKSCluster(
name=cluster_name,
arn=cluster_arn,
region=AWS_REGION_EU_WEST_1,
deletion_protection=True,
)
)
with mock.patch(
"prowler.providers.aws.services.eks.eks_service.EKS",
eks_client,
):
from prowler.providers.aws.services.eks.eks_cluster_deletion_protection_enabled.eks_cluster_deletion_protection_enabled import (
eks_cluster_deletion_protection_enabled,
)
check = eks_cluster_deletion_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
f"EKS cluster {cluster_name} has deletion protection enabled."
)
assert result[0].resource_id == cluster_name
assert result[0].resource_arn == cluster_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_EU_WEST_1
def test_cluster_deletion_protection_none(self):
eks_client = mock.MagicMock
eks_client.clusters = []
eks_client.clusters.append(
EKSCluster(
name=cluster_name,
arn=cluster_arn,
region=AWS_REGION_EU_WEST_1,
deletion_protection=None,
)
)
with mock.patch(
"prowler.providers.aws.services.eks.eks_service.EKS",
eks_client,
):
from prowler.providers.aws.services.eks.eks_cluster_deletion_protection_enabled.eks_cluster_deletion_protection_enabled import (
eks_cluster_deletion_protection_enabled,
)
check = eks_cluster_deletion_protection_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
f"EKS cluster {cluster_name} has deletion protection enabled."
)
assert result[0].resource_id == cluster_name
assert result[0].resource_arn == cluster_arn
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_EU_WEST_1