fix(k8s): match RBAC rules by apiGroup, not just core (#10969)

Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
This commit is contained in:
Pepe Fagoaga
2026-05-04 19:54:03 +02:00
committed by GitHub
parent 21d7d08b4b
commit 7c6d658154
5 changed files with 101 additions and 91 deletions
+1
View File
@@ -37,6 +37,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
### 🐞 Fixed
- Duplicate Kubernetes RBAC findings when the same User or Group subject appeared in multiple ClusterRoleBindings [(#10242)](https://github.com/prowler-cloud/prowler/pull/10242)
- Match K8s RBAC rules by `apiGroup` [(#10969)](https://github.com/prowler-cloud/prowler/pull/10969)
- Return a compact actor name from CloudTrail `userIdentity` events [(#10986)](https://github.com/prowler-cloud/prowler/pull/10986)
---
@@ -1,36 +1,37 @@
def is_rule_allowing_permissions(rules, resources, verbs):
def is_rule_allowing_permissions(rules, resources, verbs, api_groups=("",)):
"""
Check Kubernetes role permissions.
Check whether any RBAC rule grants the specified verbs on the specified
resources within the specified API groups.
This function takes in Kubernetes role rules, resources, and verbs,
and checks if any of the rules grant permissions on the specified
resources with the specified verbs.
A rule matches when its `apiGroups` includes any of `api_groups` (or "*"),
its `resources` includes any of `resources` (or "*"), and its `verbs`
includes any of `verbs` (or "*").
Args:
rules (List[Rule]): The list of Kubernetes role rules.
resources (List[str]): The list of resources to check permissions for.
verbs (List[str]): The list of verbs to check permissions for.
rules (List[Rule]): RBAC rules from a Role or ClusterRole.
resources (List[str]): Resources (or sub-resources) to check.
verbs (List[str]): Verbs to check.
api_groups (Iterable[str]): API groups the resources live in. Defaults
to ("",), the core API group, which matches the most common case.
Pass an explicit value for resources outside the core group, e.g.
("admissionregistration.k8s.io",) for webhook configurations.
Returns:
bool: True if any of the rules grant permissions, False otherwise.
bool: True if any rule grants the permission, False otherwise.
"""
if rules:
# Iterate through each rule in the list of rules
if not rules:
return False
for rule in rules:
# Ensure apiGroups are relevant ("" or "v1" for secrets)
if rule.apiGroups and all(api not in ["", "v1"] for api in rule.apiGroups):
continue # Skip rules with unrelated apiGroups
# Check if the rule has resources, verbs, and matches any of the specified resources and verbs
rule_api_groups = rule.apiGroups or [""]
if not (
any(g in rule_api_groups for g in api_groups) or "*" in rule_api_groups
):
continue
if (
rule.resources
and (
any(resource in rule.resources for resource in resources)
or "*" in rule.resources
)
and (any(r in rule.resources for r in resources) or "*" in rule.resources)
and rule.verbs
and (any(verb in rule.verbs for verb in verbs) or "*" in rule.verbs)
and (any(v in rule.verbs for v in verbs) or "*" in rule.verbs)
):
# If the rule matches, return True
return True
# If no rule matches, return False
return False
@@ -6,6 +6,7 @@ from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["update", "patch"]
resources = ["certificatesigningrequests/approval"]
api_groups = ["certificates.k8s.io"]
class rbac_minimize_csr_approval_access(Check):
@@ -33,7 +34,9 @@ class rbac_minimize_csr_approval_access(Check):
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
@@ -9,6 +9,7 @@ resources = [
"mutatingwebhookconfigurations",
]
verbs = ["create", "update", "delete"]
api_groups = ["admissionregistration.k8s.io"]
class rbac_minimize_webhook_config_access(Check):
@@ -36,7 +37,9 @@ class rbac_minimize_webhook_config_access(Check):
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
@@ -6,90 +6,92 @@ from prowler.providers.kubernetes.services.rbac.rbac_service import Rule
class TestCheckRolePermissions:
def test_is_rule_allowing_permissions(self):
# Define some sample rules, resources, and verbs for testing
rules = [
# Rule 1: Allows 'get' and 'list' on 'pods' and 'services'
Rule(resources=["pods", "services"], verbs=["get", "list"]),
# Rule 2: Allows 'create' and 'delete' on 'deployments'
Rule(resources=["deployments"], verbs=["create", "delete"]),
]
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert is_rule_allowing_permissions(rules, resources, verbs)
assert is_rule_allowing_permissions(
rules, ["pods", "deployments"], ["get", "create"]
)
def test_no_permissions(self):
# Test when there are no rules
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_no_matching_rules(self):
# Test when there are rules, but none match the specified resources and verbs
rules = [
Rule(resources=["services"], verbs=["get", "list"]),
Rule(resources=["pods"], verbs=["create", "delete"]),
]
resources = ["deployments", "configmaps"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions(
rules, ["deployments", "configmaps"], ["get", "create"]
)
def test_empty_rules(self):
# Test when the rules list is empty
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_empty_resources_and_verbs(self):
# Test when resources and verbs are empty lists
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
]
resources = []
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], [])
def test_matching_rule_with_empty_resources_or_verbs(self):
# Test when a rule matches, but either resources or verbs are empty
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], ["get"])
assert not is_rule_allowing_permissions(rules, ["pods"], [])
def test_rule_with_non_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["apps"])]
assert not is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_default_api_group_is_core(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_empty_api_groups_does_not_match_non_core_request(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert not is_rule_allowing_permissions(
rules, ["pods"], ["get"], ["admissionregistration.k8s.io"]
)
def test_non_core_rule_does_not_match_without_api_groups_argument(self):
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = []
verbs = ["get"]
assert not is_rule_allowing_permissions(
rules, ["validatingwebhookconfigurations"], ["create"]
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
resources = ["pods"]
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_ignored_api_groups(self):
# Test when a rule has apiGroups that are not relevant
def test_explicit_non_core_api_group(self):
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["test"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = ["pods"]
verbs = ["get"]
assert is_rule_allowing_permissions(
rules,
["validatingwebhookconfigurations"],
["create"],
["admissionregistration.k8s.io"],
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["*"])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, ["pods"], ["get"], ["apps"])
def test_rule_with_relevant_api_groups(self):
# Test when a rule has apiGroups that are relevant
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["", "v1"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
]
resources = ["pods"]
verbs = ["get"]
def test_rule_with_wildcard_resources(self):
rules = [Rule(resources=["*"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_verbs(self):
rules = [Rule(resources=["pods"], verbs=["*"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])