fix(k8s): deduplicate RBAC findings by unique subject (#10242)

Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
This commit is contained in:
Pepe Fagoaga
2026-05-04 18:11:38 +02:00
committed by GitHub
parent 02f43a7ad6
commit f314725f4d
6 changed files with 128 additions and 69 deletions
+8
View File
@@ -32,6 +32,14 @@ All notable changes to the **Prowler SDK** are documented in this file.
---
## [5.25.2] (Prowler v5.25.2)
### 🐞 Fixed
- Duplicate Kubernetes RBAC findings when the same User or Group subject appeared in multiple ClusterRoleBindings [(#10242)](https://github.com/prowler-cloud/prowler/pull/10242)
---
## [5.25.1] (Prowler v5.25.1)
### 🐞 Fixed
@@ -11,24 +11,32 @@ resources = ["certificatesigningrequests/approval"]
class rbac_minimize_csr_approval_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["nodes/proxy"]
class rbac_minimize_node_proxy_subresource_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
return findings
@@ -11,21 +11,32 @@ resources = ["persistentvolumes"]
class rbac_minimize_pv_creation_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check each ClusterRoleBinding for access to create PersistentVolumes
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["serviceaccounts/token"]
class rbac_minimize_service_account_token_creation(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
return findings
@@ -14,24 +14,32 @@ verbs = ["create", "update", "delete"]
class rbac_minimize_webhook_config_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
return findings