From 7c6d658154982e376e30aec216a8af11b7b36cb2 Mon Sep 17 00:00:00 2001 From: Pepe Fagoaga Date: Mon, 4 May 2026 19:54:03 +0200 Subject: [PATCH] fix(k8s): match RBAC rules by apiGroup, not just core (#10969) Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com> --- prowler/CHANGELOG.md | 1 + .../services/rbac/lib/role_permissions.py | 57 ++++---- .../rbac_minimize_csr_approval_access.py | 5 +- .../rbac_minimize_webhook_config_access.py | 5 +- .../rbac/lib/role_permissions_test.py | 124 +++++++++--------- 5 files changed, 101 insertions(+), 91 deletions(-) diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 8f2b480a36..6e4f231ec2 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -37,6 +37,7 @@ All notable changes to the **Prowler SDK** are documented in this file. ### 🐞 Fixed - Duplicate Kubernetes RBAC findings when the same User or Group subject appeared in multiple ClusterRoleBindings [(#10242)](https://github.com/prowler-cloud/prowler/pull/10242) +- Match K8s RBAC rules by `apiGroup` [(#10969)](https://github.com/prowler-cloud/prowler/pull/10969) - Return a compact actor name from CloudTrail `userIdentity` events [(#10986)](https://github.com/prowler-cloud/prowler/pull/10986) --- diff --git a/prowler/providers/kubernetes/services/rbac/lib/role_permissions.py b/prowler/providers/kubernetes/services/rbac/lib/role_permissions.py index c3db38b2f7..04bf9b4c9c 100644 --- a/prowler/providers/kubernetes/services/rbac/lib/role_permissions.py +++ b/prowler/providers/kubernetes/services/rbac/lib/role_permissions.py @@ -1,36 +1,37 @@ -def is_rule_allowing_permissions(rules, resources, verbs): +def is_rule_allowing_permissions(rules, resources, verbs, api_groups=("",)): """ - Check Kubernetes role permissions. + Check whether any RBAC rule grants the specified verbs on the specified + resources within the specified API groups. - This function takes in Kubernetes role rules, resources, and verbs, - and checks if any of the rules grant permissions on the specified - resources with the specified verbs. + A rule matches when its `apiGroups` includes any of `api_groups` (or "*"), + its `resources` includes any of `resources` (or "*"), and its `verbs` + includes any of `verbs` (or "*"). Args: - rules (List[Rule]): The list of Kubernetes role rules. - resources (List[str]): The list of resources to check permissions for. - verbs (List[str]): The list of verbs to check permissions for. + rules (List[Rule]): RBAC rules from a Role or ClusterRole. + resources (List[str]): Resources (or sub-resources) to check. + verbs (List[str]): Verbs to check. + api_groups (Iterable[str]): API groups the resources live in. Defaults + to ("",), the core API group, which matches the most common case. + Pass an explicit value for resources outside the core group, e.g. + ("admissionregistration.k8s.io",) for webhook configurations. Returns: - bool: True if any of the rules grant permissions, False otherwise. + bool: True if any rule grants the permission, False otherwise. """ - if rules: - # Iterate through each rule in the list of rules - for rule in rules: - # Ensure apiGroups are relevant ("" or "v1" for secrets) - if rule.apiGroups and all(api not in ["", "v1"] for api in rule.apiGroups): - continue # Skip rules with unrelated apiGroups - # Check if the rule has resources, verbs, and matches any of the specified resources and verbs - if ( - rule.resources - and ( - any(resource in rule.resources for resource in resources) - or "*" in rule.resources - ) - and rule.verbs - and (any(verb in rule.verbs for verb in verbs) or "*" in rule.verbs) - ): - # If the rule matches, return True - return True - # If no rule matches, return False + if not rules: + return False + for rule in rules: + rule_api_groups = rule.apiGroups or [""] + if not ( + any(g in rule_api_groups for g in api_groups) or "*" in rule_api_groups + ): + continue + if ( + rule.resources + and (any(r in rule.resources for r in resources) or "*" in rule.resources) + and rule.verbs + and (any(v in rule.verbs for v in verbs) or "*" in rule.verbs) + ): + return True return False diff --git a/prowler/providers/kubernetes/services/rbac/rbac_minimize_csr_approval_access/rbac_minimize_csr_approval_access.py b/prowler/providers/kubernetes/services/rbac/rbac_minimize_csr_approval_access/rbac_minimize_csr_approval_access.py index 17e56add55..2b86f0cafe 100644 --- a/prowler/providers/kubernetes/services/rbac/rbac_minimize_csr_approval_access/rbac_minimize_csr_approval_access.py +++ b/prowler/providers/kubernetes/services/rbac/rbac_minimize_csr_approval_access/rbac_minimize_csr_approval_access.py @@ -6,6 +6,7 @@ from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client verbs = ["update", "patch"] resources = ["certificatesigningrequests/approval"] +api_groups = ["certificates.k8s.io"] class rbac_minimize_csr_approval_access(Check): @@ -33,7 +34,9 @@ class rbac_minimize_csr_approval_access(Check): report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource." for role_name in role_names: cr = cluster_roles_by_name.get(role_name) - if cr and is_rule_allowing_permissions(cr.rules, resources, verbs): + if cr and is_rule_allowing_permissions( + cr.rules, resources, verbs, api_groups + ): report.status = "FAIL" report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource." break diff --git a/prowler/providers/kubernetes/services/rbac/rbac_minimize_webhook_config_access/rbac_minimize_webhook_config_access.py b/prowler/providers/kubernetes/services/rbac/rbac_minimize_webhook_config_access/rbac_minimize_webhook_config_access.py index ffdad52a19..e646efeeef 100644 --- a/prowler/providers/kubernetes/services/rbac/rbac_minimize_webhook_config_access/rbac_minimize_webhook_config_access.py +++ b/prowler/providers/kubernetes/services/rbac/rbac_minimize_webhook_config_access/rbac_minimize_webhook_config_access.py @@ -9,6 +9,7 @@ resources = [ "mutatingwebhookconfigurations", ] verbs = ["create", "update", "delete"] +api_groups = ["admissionregistration.k8s.io"] class rbac_minimize_webhook_config_access(Check): @@ -36,7 +37,9 @@ class rbac_minimize_webhook_config_access(Check): report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations." for role_name in role_names: cr = cluster_roles_by_name.get(role_name) - if cr and is_rule_allowing_permissions(cr.rules, resources, verbs): + if cr and is_rule_allowing_permissions( + cr.rules, resources, verbs, api_groups + ): report.status = "FAIL" report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations." break diff --git a/tests/providers/kubernetes/services/rbac/lib/role_permissions_test.py b/tests/providers/kubernetes/services/rbac/lib/role_permissions_test.py index 696550c06c..94028547e9 100644 --- a/tests/providers/kubernetes/services/rbac/lib/role_permissions_test.py +++ b/tests/providers/kubernetes/services/rbac/lib/role_permissions_test.py @@ -6,90 +6,92 @@ from prowler.providers.kubernetes.services.rbac.rbac_service import Rule class TestCheckRolePermissions: def test_is_rule_allowing_permissions(self): - # Define some sample rules, resources, and verbs for testing rules = [ - # Rule 1: Allows 'get' and 'list' on 'pods' and 'services' Rule(resources=["pods", "services"], verbs=["get", "list"]), - # Rule 2: Allows 'create' and 'delete' on 'deployments' Rule(resources=["deployments"], verbs=["create", "delete"]), ] - resources = ["pods", "deployments"] - verbs = ["get", "create"] - - assert is_rule_allowing_permissions(rules, resources, verbs) + assert is_rule_allowing_permissions( + rules, ["pods", "deployments"], ["get", "create"] + ) def test_no_permissions(self): - # Test when there are no rules - rules = [] - resources = ["pods", "deployments"] - verbs = ["get", "create"] - - assert not is_rule_allowing_permissions(rules, resources, verbs) + assert not is_rule_allowing_permissions([], ["pods"], ["get"]) def test_no_matching_rules(self): - # Test when there are rules, but none match the specified resources and verbs rules = [ Rule(resources=["services"], verbs=["get", "list"]), Rule(resources=["pods"], verbs=["create", "delete"]), ] - resources = ["deployments", "configmaps"] - verbs = ["get", "create"] - - assert not is_rule_allowing_permissions(rules, resources, verbs) + assert not is_rule_allowing_permissions( + rules, ["deployments", "configmaps"], ["get", "create"] + ) def test_empty_rules(self): - # Test when the rules list is empty - rules = [] - resources = ["pods", "deployments"] - verbs = ["get", "create"] - - assert not is_rule_allowing_permissions(rules, resources, verbs) + assert not is_rule_allowing_permissions([], ["pods"], ["get"]) def test_empty_resources_and_verbs(self): - # Test when resources and verbs are empty lists - rules = [ - Rule(resources=["pods"], verbs=["get"]), - Rule(resources=["services"], verbs=["list"]), - ] - resources = [] - verbs = [] - - assert not is_rule_allowing_permissions(rules, resources, verbs) + rules = [Rule(resources=["pods"], verbs=["get"])] + assert not is_rule_allowing_permissions(rules, [], []) def test_matching_rule_with_empty_resources_or_verbs(self): - # Test when a rule matches, but either resources or verbs are empty + rules = [Rule(resources=["pods"], verbs=["get"])] + assert not is_rule_allowing_permissions(rules, [], ["get"]) + assert not is_rule_allowing_permissions(rules, ["pods"], []) + + def test_rule_with_non_matching_api_group(self): + rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["apps"])] + assert not is_rule_allowing_permissions(rules, ["pods"], ["get"]) + + def test_rule_with_matching_api_group(self): + rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=[""])] + assert is_rule_allowing_permissions(rules, ["pods"], ["get"]) + + def test_default_api_group_is_core(self): + rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)] + assert is_rule_allowing_permissions(rules, ["pods"], ["get"]) + + def test_rule_with_empty_api_groups_does_not_match_non_core_request(self): + rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)] + assert not is_rule_allowing_permissions( + rules, ["pods"], ["get"], ["admissionregistration.k8s.io"] + ) + + def test_non_core_rule_does_not_match_without_api_groups_argument(self): rules = [ - Rule(resources=["pods"], verbs=["get"]), - Rule(resources=["services"], verbs=["list"]), + Rule( + resources=["validatingwebhookconfigurations"], + verbs=["create"], + apiGroups=["admissionregistration.k8s.io"], + ) ] - resources = [] - verbs = ["get"] + assert not is_rule_allowing_permissions( + rules, ["validatingwebhookconfigurations"], ["create"] + ) - assert not is_rule_allowing_permissions(rules, resources, verbs) - - resources = ["pods"] - verbs = [] - - assert not is_rule_allowing_permissions(rules, resources, verbs) - - def test_rule_with_ignored_api_groups(self): - # Test when a rule has apiGroups that are not relevant + def test_explicit_non_core_api_group(self): rules = [ - Rule(resources=["pods"], verbs=["get"], apiGroups=["test"]), - Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]), + Rule( + resources=["validatingwebhookconfigurations"], + verbs=["create"], + apiGroups=["admissionregistration.k8s.io"], + ) ] - resources = ["pods"] - verbs = ["get"] + assert is_rule_allowing_permissions( + rules, + ["validatingwebhookconfigurations"], + ["create"], + ["admissionregistration.k8s.io"], + ) - assert not is_rule_allowing_permissions(rules, resources, verbs) + def test_rule_with_wildcard_api_group(self): + rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["*"])] + assert is_rule_allowing_permissions(rules, ["pods"], ["get"]) + assert is_rule_allowing_permissions(rules, ["pods"], ["get"], ["apps"]) - def test_rule_with_relevant_api_groups(self): - # Test when a rule has apiGroups that are relevant - rules = [ - Rule(resources=["pods"], verbs=["get"], apiGroups=["", "v1"]), - Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]), - ] - resources = ["pods"] - verbs = ["get"] + def test_rule_with_wildcard_resources(self): + rules = [Rule(resources=["*"], verbs=["get"], apiGroups=[""])] + assert is_rule_allowing_permissions(rules, ["pods"], ["get"]) - assert is_rule_allowing_permissions(rules, resources, verbs) + def test_rule_with_wildcard_verbs(self): + rules = [Rule(resources=["pods"], verbs=["*"], apiGroups=[""])] + assert is_rule_allowing_permissions(rules, ["pods"], ["get"])