Compare commits

...

8 Commits

Author SHA1 Message Date
Pepe Fagoaga
8a94ce7804 fix: collisions 2026-03-13 14:48:26 +01:00
Pepe Fagoaga
9790b1da90 fix: changelog 2026-03-13 13:14:58 +01:00
Pepe Fagoaga
b499620128 Merge branch 'master' of github.com:prowler-cloud/prowler into k8s-dedup-rbac-findings-subject 2026-03-13 13:12:19 +01:00
Andoni A.
ec1a0ee8b7 docs(k8s): add changelog entry for RBAC findings deduplication 2026-03-04 08:59:49 +01:00
Andoni A.
1849520008 fix(k8s): simplify resource_id, document ServiceAccount exclusion
Remove dead namespace branch in resource_id (User/Group subjects in
ClusterRoleBindings always have namespace=""). Add comment explaining
why ServiceAccount subjects are filtered out. Simplify loop variable
unpacking.
2026-03-04 08:50:53 +01:00
Andoni A.
0b06475916 fix(k8s): include namespace in resource_id, drop resource_name override
Include namespace in resource_id when present (kind/namespace/name) to
match the dedup key and prevent collisions. Remove resource_name
override so the constructor default (subject.name) is used per Prowler
convention.
2026-03-04 08:34:18 +01:00
Andoni A.
a1bc9abf21 fix(k8s): disambiguate RBAC findings by prefixing resource_id with subject kind
- Override resource_id and resource_name to "kind/name" format in all five
  deduplicated RBAC checks so User and Group with the same name produce
  distinct findings
2026-03-04 08:13:31 +01:00
Pepe Fagoaga
e489ffc8c7 fix(k8s): deduplicate RBAC findings by unique subject 2026-03-03 16:25:46 +01:00
6 changed files with 109 additions and 69 deletions

View File

@@ -16,6 +16,10 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Update M365 Entra ID service metadata to new format [(#9682)](https://github.com/prowler-cloud/prowler/pull/9682)
- Update ResourceType and Categories for Azure Entra ID service metadata [(#10334)](https://github.com/prowler-cloud/prowler/pull/10334)
### 🐞 Fixed
- Duplicate Kubernetes RBAC findings when the same User or Group subject appeared in multiple ClusterRoleBindings [(#10242)](https://github.com/prowler-cloud/prowler/pull/10242)
### 🔐 Security
- Bump `multipart` to 1.3.1 to fix [GHSA-p2m9-wcp5-6qw3](https://github.com/defnull/multipart/security/advisories/GHSA-p2m9-wcp5-6qw3) [(#10331)](https://github.com/prowler-cloud/prowler/pull/10331)

View File

@@ -11,24 +11,29 @@ resources = ["certificatesigningrequests/approval"]
class rbac_minimize_csr_approval_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name in role_names:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
return findings

View File

@@ -11,20 +11,29 @@ resources = ["nodes/proxy"]
class rbac_minimize_node_proxy_subresource_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name in role_names:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
return findings

View File

@@ -11,21 +11,29 @@ resources = ["persistentvolumes"]
class rbac_minimize_pv_creation_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check each ClusterRoleBinding for access to create PersistentVolumes
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name in role_names:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
return findings

View File

@@ -11,20 +11,29 @@ resources = ["serviceaccounts/token"]
class rbac_minimize_service_account_token_creation(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name in role_names:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
return findings

View File

@@ -14,24 +14,29 @@ verbs = ["create", "update", "delete"]
class rbac_minimize_webhook_config_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name in role_names:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
return findings