feat(rbac): add 9 checks of Kubernetes RBAC service (#3314)

This commit is contained in:
Sergio Garcia
2024-02-27 13:54:46 +01:00
committed by GitHub
parent 364a945d28
commit effc743b6e
28 changed files with 826 additions and 8 deletions

View File

@@ -0,0 +1,33 @@
def is_rule_allowing_permisions(rules, resources, verbs):
"""
Check Kubernetes role permissions.
This function takes in Kubernetes role rules, resources, and verbs,
and checks if any of the rules grant permissions on the specified
resources with the specified verbs.
Args:
rules (List[Rule]): The list of Kubernetes role rules.
resources (List[str]): The list of resources to check permissions for.
verbs (List[str]): The list of verbs to check permissions for.
Returns:
bool: True if any of the rules grant permissions, False otherwise.
"""
if rules:
# Iterate through each rule in the list of rules
for rule in rules:
# Check if the rule has resources, verbs, and matches any of the specified resources and verbs
if (
rule.resources
and (
any(resource in rule.resources for resource in resources)
or "*" in rule.resources
)
and rule.verbs
and (any(verb in rule.verbs for verb in verbs) or "*" in rule.verbs)
):
# If the rule matches, return True
return True
# If no rule matches, return False
return False

View File

@@ -6,12 +6,12 @@ class rbac_cluster_admin_usage(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Iterate through the bindings
for binding in rbac_client.cluster_role_bindings:
for binding in rbac_client.cluster_role_bindings.values():
# Check if the binding refers to the cluster-admin role
if binding.roleRef.name == "cluster-admin":
report = Check_Report_Kubernetes(self.metadata())
report.namespace = (
"kube-system"
"cluster-wide"
if not binding.metadata.namespace
else binding.metadata.namespace
)

View File

@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "rbac_minimize_csr_approval_access",
"CheckTitle": "Minimize access to the approval sub-resource of certificatesigningrequests objects",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "RBAC",
"SubServiceName": "CSR Management",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "CertificateSigningRequestApproval",
"Description": "This check ensures that access to the approval sub-resource of certificate signing request (CSR) objects is restricted. Access to update the approval sub-resource can lead to privilege escalation, allowing creation of new high-privileged user accounts in the cluster.",
"Risk": "Unauthorized access to update the approval sub-resource of CSR objects can lead to significant security vulnerabilities, including unauthorized user creation and privilege escalation.",
"RelatedUrl": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#csrs-and-certificate-issuing",
"Remediation": {
"Code": {
"CLI": "Review and update RBAC configurations to restrict access to the approval sub-resource of CSR objects. Ensure that only trusted users or service accounts have the required permissions.",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict access to the approval sub-resource of CSR objects in the cluster.",
"Url": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#csrs-and-certificate-issuing"
}
},
"Categories": [
"Access Control",
"Privilege Management"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Carefully evaluate which users or service accounts require the ability to update the approval sub-resource of CSR objects."
}

View File

@@ -0,0 +1,35 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["update", "patch"]
resources = ["certificatesigningrequests/approval"]
class rbac_minimize_csr_approval_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(self.metadata())
report.namespace = "cluster-wide"
report.resource_name = subject.name
report.resource_id = subject.uid if hasattr(subject, "uid") else ""
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "rbac_minimize_node_proxy_subresource_access",
"CheckTitle": "Minimize access to the proxy sub-resource of nodes",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "RBAC",
"SubServiceName": "Node Management",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NodeProxySubResource",
"Description": "This check ensures that access to the proxy sub-resource of node objects is restricted. Access to this sub-resource can grant privileges to use the Kubelet API directly, bypassing Kubernetes API controls like audit logging and admission control, potentially leading to privilege escalation.",
"Risk": "Unauthorized access to the proxy sub-resource of node objects can lead to significant security vulnerabilities, including privilege escalation.",
"RelatedUrl": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#access-to-proxy-subresource-of-nodes",
"Remediation": {
"Code": {
"CLI": "Review and update RBAC configurations to restrict access to the proxy sub-resource of node objects. Ensure that only trusted users have the required permissions.",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict access to the proxy sub-resource of node objects in the cluster.",
"Url": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#access-to-proxy-subresource-of-nodes"
}
},
"Categories": [
"Access Control",
"Privilege Management"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Carefully evaluate which users or service accounts require the ability to access the proxy sub-resource of node objects."
}

View File

@@ -0,0 +1,31 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["get", "list", "watch"]
resources = ["nodes/proxy"]
class rbac_minimize_node_proxy_subresource_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(self.metadata())
report.namespace = "cluster-wide"
report.resource_name = subject.name
report.resource_id = subject.uid if hasattr(subject, "uid") else ""
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "rbac_minimize_pod_creation_access",
"CheckTitle": "Minimize access to create pods",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "RBAC",
"SubServiceName": "Pod Creation Access Management",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Pod",
"Description": "This check ensures that the ability to create pods in a Kubernetes cluster is restricted to a minimal group of users. Limiting pod creation access mitigates the risk of privilege escalation and exposure of sensitive data.",
"Risk": "Unrestricted access to create pods can lead to potential security risks and privilege escalation within the cluster.",
"RelatedUrl": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/",
"Remediation": {
"Code": {
"CLI": "Review and update RBAC settings to restrict pod creation access to necessary users only. Use `kubectl get clusterrolebindings -o yaml` and `kubectl get rolebindings --all-namespaces -o yaml` to identify users with pod creation privileges.",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict pod creation access to minimize security risks.",
"Url": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/#role-and-clusterrole"
}
},
"Categories": [
"Access Control",
"Policy Management"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Care should be taken to ensure that restrictions do not disrupt normal operations of the cluster."
}

View File

@@ -0,0 +1,49 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["create"]
resources = ["pods"]
class rbac_minimize_pod_creation_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check ClusterRoleBindings for pod create access
for cr in rbac_client.cluster_roles.values():
report = Check_Report_Kubernetes(self.metadata())
report.namespace = "cluster-wide"
report.resource_name = cr.metadata.name
report.resource_id = cr.metadata.uid
report.status = "PASS"
report.status_extended = (
f"ClusterRole {cr.metadata.name} does not have pod create access."
)
if is_rule_allowing_permisions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = (
f"ClusterRole {cr.metadata.name} has pod create access."
)
findings.append(report)
# Check RoleBindings for pod create access
for role in rbac_client.roles.values():
report = Check_Report_Kubernetes(self.metadata())
report.namespace = role.metadata.namespace
report.resource_name = role.metadata.name
report.resource_id = role.metadata.uid
report.status = "PASS"
report.status_extended = (
f"Role {role.metadata.name} does not have pod create access."
)
if is_rule_allowing_permisions(role.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = (
f"Role {role.metadata.name} has pod create access."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "rbac_minimize_pv_creation_access",
"CheckTitle": "Minimize access to create persistent volumes",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "RBAC",
"SubServiceName": "Persistent Volume Management",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "PersistentVolume",
"Description": "This check ensures that the ability to create persistent volumes in Kubernetes is restricted to authorized users only. Limiting this capability helps prevent privilege escalation scenarios through the creation of hostPath volumes.",
"Risk": "Excessive permissions to create persistent volumes can lead to unauthorized access to sensitive host files, overriding the restrictions imposed by Pod Security Admission policies.",
"RelatedUrl": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#persistent-volume-creation",
"Remediation": {
"Code": {
"CLI": "Review and update RBAC configurations to restrict creation access to PersistentVolume objects. Ensure that only trusted users have the required permissions.",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict access to create persistent volumes in the cluster.",
"Url": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#persistent-volume-creation"
}
},
"Categories": [
"Access Control",
"Privilege Management"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Carefully evaluate which users or service accounts require the ability to create PersistentVolumes and restrict access accordingly."
}

View File

@@ -0,0 +1,32 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["create"]
resources = ["persistentvolumes"]
class rbac_minimize_pv_creation_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check each ClusterRoleBinding for access to create PersistentVolumes
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(self.metadata())
report.namespace = "cluster-wide"
report.resource_name = subject.name
report.resource_id = subject.uid if hasattr(subject, "uid") else ""
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "rbac_minimize_secret_access",
"CheckTitle": "Minimize access to secrets",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "Kubernetes API",
"SubServiceName": "Secrets Management",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Secrets",
"Description": "This check ensures that access to secrets in the Kubernetes API is restricted to the smallest possible group of users. Minimizing access to secrets helps in reducing the risk of privilege escalation and potential unauthorized access to sensitive data.",
"Risk": "Inappropriate access to secrets can lead to escalation of privileges and unauthorized access to cluster resources or external resources managed through the secrets.",
"RelatedUrl": "https://kubernetes.io/docs/concepts/configuration/secret/",
"Remediation": {
"Code": {
"CLI": "Review and update RBAC configurations to restrict get, list, and watch permissions for secrets. Use Role and RoleBinding for namespace-specific access, and ClusterRole and ClusterRoleBinding for cluster-wide access.",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict access to Kubernetes secrets to the smallest possible set of users.",
"Url": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/"
}
},
"Categories": [
"Access Control",
"Data Security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Care should be taken to avoid disrupting system components that require access to secrets for proper functioning."
}

View File

@@ -0,0 +1,47 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["get", "list", "watch"]
resources = ["secret"]
class rbac_minimize_secret_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check ClusterRoleBindings for seceret access
for cr in rbac_client.cluster_roles.values():
report = Check_Report_Kubernetes(self.metadata())
report.namespace = "cluster-wide"
report.resource_name = cr.metadata.name
report.resource_id = cr.metadata.uid
report.status = "PASS"
report.status_extended = (
f"ClusterRole {cr.metadata.name} does not have secret access."
)
if is_rule_allowing_permisions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = (
f"ClusterRole {cr.metadata.name} has secret access."
)
findings.append(report)
# Check RoleBindings for secret access
for role in rbac_client.roles.values():
report = Check_Report_Kubernetes(self.metadata())
report.namespace = role.metadata.namespace
report.resource_name = role.metadata.name
report.resource_id = role.metadata.uid
report.status = "PASS"
report.status_extended = (
f"Role {role.metadata.name} does not have secret access."
)
if is_rule_allowing_permisions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"Role {role.metadata.name} has secret access."
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "rbac_minimize_service_account_token_creation",
"CheckTitle": "Minimize access to the service account token creation",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "RBAC",
"SubServiceName": "Service Account Token Creation",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "ServiceAccountToken",
"Description": "This check ensures that access to create new service account tokens is restricted within the Kubernetes cluster. Unrestricted token creation can lead to privilege escalation and persistent unauthorized access to the cluster.",
"Risk": "Granting excessive permissions for service account token creation can lead to abuse and compromise of cluster security.",
"RelatedUrl": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#token-request",
"Remediation": {
"Code": {
"CLI": "Review and update RBAC configurations to restrict access to the token sub-resource of serviceaccount objects. Ensure only trusted users or service accounts have necessary permissions.",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict access to service account token creation in the cluster.",
"Url": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#token-request"
}
},
"Categories": [
"Access Control",
"Configuration Management"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Consider using role-based access control to precisely define and manage permissions related to service account token creation."
}

View File

@@ -0,0 +1,31 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["create"]
resources = ["serviceaccounts/token"]
class rbac_minimize_service_account_token_creation(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(self.metadata())
report.namespace = "cluster-wide"
report.resource_name = subject.name
report.resource_id = subject.uid if hasattr(subject, "uid") else ""
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "rbac_minimize_webhook_config_access",
"CheckTitle": "Minimize access to webhook configuration objects",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "RBAC",
"SubServiceName": "Webhook Configuration",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "WebhookConfiguration",
"Description": "This check ensures that access to webhook configuration objects (validatingwebhookconfigurations and mutatingwebhookconfigurations) is restricted. Unauthorized access or modification of these objects can lead to privilege escalation or disruption of cluster operations.",
"Risk": "Inadequately restricted access to webhook configurations can result in unauthorized control over webhooks, potentially allowing privilege escalation or interference with cluster functionality.",
"RelatedUrl": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#control-admission-webhooks",
"Remediation": {
"Code": {
"CLI": "Review and update RBAC configurations to restrict access to validatingwebhookconfigurations and mutatingwebhookconfigurations. Ensure only trusted users or service accounts have necessary permissions.",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict access to webhook configuration objects in the cluster.",
"Url": "https://kubernetes.io/docs/concepts/security/rbac-good-practices/#control-admission-webhooks"
}
},
"Categories": [
"Access Control",
"Configuration Management"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Consider using role-based access control to precisely define and manage permissions related to webhook configurations."
}

View File

@@ -0,0 +1,38 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
resources = [
"validatingwebhookconfigurations",
"mutatingwebhookconfigurations",
]
verbs = ["create", "update", "delete"]
class rbac_minimize_webhook_config_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(self.metadata())
report.namespace = "cluster-wide"
report.resource_name = subject.name
report.resource_id = subject.uid if hasattr(subject, "uid") else ""
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "rbac_minimize_wildcard_use_roles",
"CheckTitle": "Minimize wildcard use in Roles and ClusterRoles",
"CheckType": [
"Security",
"Configuration"
],
"ServiceName": "RBAC",
"SubServiceName": "Roles Management",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Role/ClusterRole",
"Description": "This check ensures that Roles and ClusterRoles in Kubernetes minimize the use of wildcards. Restricting wildcards enhances security by enforcing the principle of least privilege, ensuring users have only the access required for their role.",
"Risk": "Use of wildcards can lead to excessive rights being granted, potentially allowing users to access or modify resources beyond their scope of responsibility.",
"RelatedUrl": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/",
"Remediation": {
"Code": {
"CLI": "Review and update Roles and ClusterRoles to replace wildcards with specific resource names and actions. Use `kubectl get roles --all-namespaces -o yaml` and `kubectl get clusterroles -o yaml` to list current configurations.",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Replace wildcards in roles and clusterroles with specific permissions.",
"Url": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/#referring-to-resources"
}
},
"Categories": [
"Access Control",
"Policy Management"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Care should be taken to ensure that replacing wildcards does not disrupt normal operations of the cluster."
}

View File

@@ -0,0 +1,50 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
class rbac_minimize_wildcard_use_roles(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check ClusterRoles for wildcards
for cr in rbac_client.cluster_roles.values():
report = Check_Report_Kubernetes(self.metadata())
report.namespace = "cluster-wide"
report.resource_name = cr.metadata.name
report.resource_id = cr.metadata.uid
report.status = "PASS"
report.status_extended = (
f"ClusterRole {cr.metadata.name} does not use wildcards."
)
for rule in cr.rules:
if (rule.resources and "*" in str(rule.resources)) or (
rule.verbs and "*" in rule.verbs
):
report.status = "FAIL"
report.status_extended = (
f"ClusterRole {cr.metadata.name} uses wildcards."
)
findings.append(report)
# Check Roles for wildcards
for role in rbac_client.roles.values():
report = Check_Report_Kubernetes(self.metadata())
report.namespace = role.metadata.namespace
report.resource_name = role.metadata.name
report.resource_id = role.metadata.uid
report.status = "PASS"
report.status_extended = (
f"Role {role.metadata.name} does not use wildcards."
)
for rule in role.rules:
if (rule.resources and "*" in str(rule.resources)) or (
rule.verbs and "*" in rule.verbs
):
report.status = "FAIL"
report.status_extended = (
f"Role {role.metadata.name} uses wildcards."
)
findings.append(report)
return findings

View File

@@ -13,11 +13,14 @@ class Rbac(KubernetesService):
super().__init__(audit_info)
self.client = client.RbacAuthorizationV1Api()
self.cluster_role_bindings = self.__list_cluster_role_binding__()
self.cluster_role_bindings = self.__list_cluster_role_bindings__()
self.role_bindings = self.__list_role_bindings__()
self.cluster_roles = self.__list_cluster_roles__()
self.roles = self.__list_roles__()
def __list_cluster_role_binding__(self):
def __list_cluster_role_bindings__(self):
try:
bindings_list = []
bindings = {}
for binding in self.client.list_cluster_role_binding().items:
# For each binding, create a ClusterRoleBinding object and append it to the list
formatted_binding = {
@@ -29,6 +32,7 @@ class Rbac(KubernetesService):
"kind": subject.kind,
"name": subject.name,
"namespace": getattr(subject, "namespace", ""),
"metadata": getattr(subject, "metadata", None),
}
for subject in binding.subjects
],
@@ -38,18 +42,99 @@ class Rbac(KubernetesService):
"apiGroup": binding.role_ref.api_group,
},
}
bindings_list.append(ClusterRoleBinding(**formatted_binding))
return bindings_list
bindings[binding.metadata.uid] = ClusterRoleBinding(**formatted_binding)
return bindings
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return []
def __list_role_bindings__(self):
try:
role_bindings = {}
for binding in self.client.list_role_binding_for_all_namespaces().items:
formatted_binding = {
"metadata": binding.metadata,
"subjects": [
{
"kind": subject.kind,
"name": subject.name,
"namespace": getattr(subject, "namespace", None),
"metadata": getattr(subject, "metadata", None),
}
for subject in binding.subjects
],
"roleRef": {
"kind": binding.role_ref.kind,
"name": binding.role_ref.name,
"apiGroup": binding.role_ref.api_group,
},
}
role_bindings[binding.metadata.uid] = RoleBinding(**formatted_binding)
return role_bindings
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return []
def __list_roles__(self):
try:
roles = {}
for role in self.client.list_role_for_all_namespaces().items:
formatted_role = {
"uid": role.metadata.uid,
"name": role.metadata.name,
"metadata": role.metadata,
"rules": [
{
"apiGroups": rule.api_groups,
"resources": rule.resources,
"verbs": rule.verbs,
}
for rule in role.rules
],
}
roles[role.metadata.uid] = Role(**formatted_role)
return roles
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return []
def __list_cluster_roles__(self):
try:
cluster_roles = {}
for role in self.client.list_cluster_role().items:
formatted_role = {
"uid": role.metadata.uid,
"name": role.metadata.name,
"metadata": role.metadata,
"rules": [
{
"apiGroups": rule.api_groups,
"resources": rule.resources,
"verbs": rule.verbs,
}
for rule in role.rules
],
}
cluster_roles[role.metadata.uid] = ClusterRole(**formatted_role)
return cluster_roles
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return []
class Subject(BaseModel):
kind: str
name: str
namespace: Optional[str] = None
namespace: Optional[str]
metadata: Any
class RoleRef(BaseModel):
@@ -62,3 +147,29 @@ class ClusterRoleBinding(BaseModel):
metadata: Any
subjects: List[Subject]
roleRef: RoleRef
class RoleBinding(BaseModel):
metadata: Any
subjects: List[Subject]
roleRef: RoleRef
class Rule(BaseModel):
apiGroups: Optional[List[str]]
resources: Optional[List[str]]
verbs: Optional[List[str]]
class Role(BaseModel):
name: str
uid: str
metadata: Any
rules: List[Rule]
class ClusterRole(BaseModel):
name: str
uid: str
metadata: Any
rules: List[Rule]

View File

@@ -0,0 +1,73 @@
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
)
from prowler.providers.kubernetes.services.rbac.rbac_service import Rule
class TestCheckRolePermissions:
def test_is_rule_allowing_permisions(self):
# Define some sample rules, resources, and verbs for testing
rules = [
# Rule 1: Allows 'get' and 'list' on 'pods' and 'services'
Rule(resources=["pods", "services"], verbs=["get", "list"]),
# Rule 2: Allows 'create' and 'delete' on 'deployments'
Rule(resources=["deployments"], verbs=["create", "delete"]),
]
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert is_rule_allowing_permisions(rules, resources, verbs)
def test_no_permissions(self):
# Test when there are no rules
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permisions(rules, resources, verbs)
def test_no_matching_rules(self):
# Test when there are rules, but none match the specified resources and verbs
rules = [
Rule(resources=["services"], verbs=["get", "list"]),
Rule(resources=["pods"], verbs=["create", "delete"]),
]
resources = ["deployments", "configmaps"]
verbs = ["get", "create"]
assert not is_rule_allowing_permisions(rules, resources, verbs)
def test_empty_rules(self):
# Test when the rules list is empty
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permisions(rules, resources, verbs)
def test_empty_resources_and_verbs(self):
# Test when resources and verbs are empty lists
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
]
resources = []
verbs = []
assert not is_rule_allowing_permisions(rules, resources, verbs)
def test_matching_rule_with_empty_resources_or_verbs(self):
# Test when a rule matches, but either resources or verbs are empty
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
]
resources = []
verbs = ["get"]
assert not is_rule_allowing_permisions(rules, resources, verbs)
resources = ["pods"]
verbs = []
assert not is_rule_allowing_permisions(rules, resources, verbs)