feat(kubernetes): add Kubernetes provider (#3226)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Sergio Garcia
2024-01-09 10:31:51 +01:00
committed by GitHub
parent 0ef85b3dee
commit c2f8980f1f
45 changed files with 963 additions and 11 deletions
Generated
+29 -4
View File
@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -1339,6 +1339,32 @@ files = [
[package.dependencies]
six = "*"
[[package]]
name = "kubernetes"
version = "28.1.0"
description = "Kubernetes python client"
optional = false
python-versions = ">=3.6"
files = [
{file = "kubernetes-28.1.0-py2.py3-none-any.whl", hash = "sha256:10f56f8160dcb73647f15fafda268e7f60cf7dbc9f8e46d52fcd46d3beb0c18d"},
{file = "kubernetes-28.1.0.tar.gz", hash = "sha256:1468069a573430fb1cb5ad22876868f57977930f80a6749405da31cd6086a7e9"},
]
[package.dependencies]
certifi = ">=14.05.14"
google-auth = ">=1.0.1"
oauthlib = ">=3.2.2"
python-dateutil = ">=2.5.3"
pyyaml = ">=5.4.1"
requests = "*"
requests-oauthlib = "*"
six = ">=1.9.0"
urllib3 = ">=1.24.2,<2.0"
websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.dev0 || >=0.43.dev0"
[package.extras]
adal = ["adal (>=1.0.2)"]
[[package]]
name = "lazy-object-proxy"
version = "1.9.0"
@@ -2773,8 +2799,7 @@ files = [
{file = "ruamel.yaml.clib-0.2.7-cp310-cp310-win32.whl", hash = "sha256:763d65baa3b952479c4e972669f679fe490eee058d5aa85da483ebae2009d231"},
{file = "ruamel.yaml.clib-0.2.7-cp310-cp310-win_amd64.whl", hash = "sha256:d000f258cf42fec2b1bbf2863c61d7b8918d31ffee905da62dede869254d3b8a"},
{file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:045e0626baf1c52e5527bd5db361bc83180faaba2ff586e763d3d5982a876a9e"},
{file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:1a6391a7cabb7641c32517539ca42cf84b87b667bad38b78d4d42dd23e957c81"},
{file = "ruamel.yaml.clib-0.2.7-cp311-cp311-manylinux2014_aarch64.whl", hash = "sha256:9c7617df90c1365638916b98cdd9be833d31d337dbcd722485597b43c4a215bf"},
{file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_12_6_arm64.whl", hash = "sha256:721bc4ba4525f53f6a611ec0967bdcee61b31df5a56801281027a3a6d1c2daf5"},
{file = "ruamel.yaml.clib-0.2.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:41d0f1fa4c6830176eef5b276af04c89320ea616655d01327d5ce65e50575c94"},
{file = "ruamel.yaml.clib-0.2.7-cp311-cp311-win32.whl", hash = "sha256:f6d3d39611ac2e4f62c3128a9eed45f19a6608670c5a2f4f07f24e8de3441d38"},
{file = "ruamel.yaml.clib-0.2.7-cp311-cp311-win_amd64.whl", hash = "sha256:da538167284de58a52109a9b89b8f6a53ff8437dd6dc26d33b57bf6699153122"},
@@ -3312,4 +3337,4 @@ docs = ["mkdocs", "mkdocs-material"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.9,<3.12"
content-hash = "653c89aa68d9924b75c01a3dd894fdffc57fb899d46dcc23728e0543a3fc24e9"
content-hash = "5eeda2c0549c1a40ebedefe766f0d7e27e78ed123aaacb3e42d242271774b1da"
+3
View File
@@ -92,3 +92,6 @@ azure:
# GCP Configuration
gcp:
# Kubernetes Configuration
kubernetes:
@@ -13,3 +13,7 @@ CustomChecksMetadata:
Checks:
compute_instance_public_ip:
Severity: critical
kubernetes:
Checks:
apiserver_anonymous_requests:
Severity: low
+16
View File
@@ -146,6 +146,22 @@ class Check_Report_GCP(Check_Report):
self.location = ""
@dataclass
class Check_Report_Kubernetes(Check_Report):
# TODO change class name to CheckReportKubernetes
"""Contains the Kubernetes Check's finding information."""
resource_name: str
resource_id: str
namespace: str
def __init__(self, metadata):
super().__init__(metadata)
self.resource_name = ""
self.resource_id = ""
self.namespace = ""
# Testing Pending
def load_check_metadata(metadata_file: str) -> Check_Metadata_Model:
"""load_check_metadata loads and parse a Check's metadata file"""
+49
View File
@@ -21,6 +21,7 @@ from prowler.lib.utils.utils import open_file
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.azure.lib.audit_info.models import Azure_Audit_Info
from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
from prowler.providers.kubernetes.lib.audit_info.models import Kubernetes_Audit_Info
def add_html_header(file_descriptor, audit_info):
@@ -522,6 +523,53 @@ def get_gcp_html_assessment_summary(audit_info):
sys.exit(1)
def get_kubernetes_html_assessment_summary(audit_info):
try:
if isinstance(audit_info, Kubernetes_Audit_Info):
return (
"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
Kubernetes Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Kubernetes Context:</b> """
+ audit_info.context["name"]
+ """
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Kubernetes Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Kubernetes Cluster:</b> """
+ audit_info.context["context"]["cluster"]
+ """
</li>
<li class="list-group-item">
<b>Kubernetes User:</b> """
+ audit_info.context["context"]["user"]
+ """
</li>
</ul>
</div>
</div>
"""
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)
def get_assessment_summary(audit_info):
"""
get_assessment_summary gets the HTML assessment summary for the provider
@@ -532,6 +580,7 @@ def get_assessment_summary(audit_info):
# AWS_Audit_Info --> aws
# GCP_Audit_Info --> gcp
# Azure_Audit_Info --> azure
# Kubernetes_Audit_Info --> kubernetes
provider = audit_info.__class__.__name__.split("_")[0].lower()
# Dynamically get the Provider quick inventory handler
+36 -1
View File
@@ -85,6 +85,18 @@ def generate_provider_output_csv(
)
finding_output = output_model(**data)
if provider == "kubernetes":
data["resource_id"] = finding.resource_id
data["resource_name"] = finding.resource_name
data["namespace"] = finding.namespace
data[
"finding_unique_id"
] = f"prowler-{provider}-{finding.check_metadata.CheckID}-{finding.namespace}-{finding.resource_id}"
data["compliance"] = unroll_dict(
get_check_compliance(finding, provider, output_options)
)
finding_output = output_model(**data)
if provider == "aws":
data["profile"] = audit_info.profile
data["account_id"] = audit_info.audited_account
@@ -357,6 +369,16 @@ class Gcp_Check_Output_CSV(Check_Output_CSV):
resource_name: str = ""
class Kubernetes_Check_Output_CSV(Check_Output_CSV):
"""
Kubernetes_Check_Output_CSV generates a finding's output in CSV format for the Kubernetes provider.
"""
namespace: str = ""
resource_id: str = ""
resource_name: str = ""
def generate_provider_output_json(
provider: str, finding, audit_info, mode: str, output_options
):
@@ -487,7 +509,7 @@ class Azure_Check_Output_JSON(Check_Output_JSON):
class Gcp_Check_Output_JSON(Check_Output_JSON):
"""
Gcp_Check_Output_JSON generates a finding's output in JSON format for the AWS provider.
Gcp_Check_Output_JSON generates a finding's output in JSON format for the GCP provider.
"""
ProjectId: str = ""
@@ -499,6 +521,19 @@ class Gcp_Check_Output_JSON(Check_Output_JSON):
super().__init__(**metadata)
class Kubernetes_Check_Output_JSON(Check_Output_JSON):
"""
Kubernetes_Check_Output_JSON generates a finding's output in JSON format for the Kubernetes provider.
"""
ResourceId: str = ""
ResourceName: str = ""
Namespace: str = ""
def __init__(self, **metadata):
super().__init__(**metadata)
class Check_Output_MITRE_ATTACK(BaseModel):
"""
Check_Output_MITRE_ATTACK generates a finding's output in CSV MITRE ATTACK format.
+2
View File
@@ -27,6 +27,8 @@ def stdout_report(finding, color, verbose, is_quiet):
details = finding.check_metadata.ServiceName
if finding.check_metadata.Provider == "gcp":
details = finding.location.lower()
if finding.check_metadata.Provider == "kubernetes":
details = finding.namespace.lower()
if verbose and not (is_quiet and finding.status != "FAIL"):
print(
+3
View File
@@ -39,6 +39,9 @@ def display_summary_table(
elif provider == "gcp":
entity_type = "Project ID/s"
audited_entities = ", ".join(audit_info.project_ids)
elif provider == "kubernetes":
entity_type = "Context"
audited_entities = audit_info.context["name"]
if findings:
current = {
+37
View File
@@ -34,6 +34,9 @@ from prowler.providers.azure.lib.exception.exception import AzureException
from prowler.providers.gcp.gcp_provider import GCP_Provider
from prowler.providers.gcp.lib.audit_info.audit_info import gcp_audit_info
from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
from prowler.providers.kubernetes.kubernetes_provider import Kubernetes_Provider
from prowler.providers.kubernetes.lib.audit_info.audit_info import kubernetes_audit_info
from prowler.providers.kubernetes.lib.audit_info.models import Kubernetes_Audit_Info
class Audit_Info:
@@ -56,6 +59,21 @@ class Audit_Info:
This report is being generated using credentials below:
GCP Account: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} GCP Project IDs: {Fore.YELLOW}[{", ".join(audit_info.project_ids)}]{Style.RESET_ALL}
"""
print(report)
def print_kubernetes_credentials(self, audit_info: Kubernetes_Audit_Info):
# Get the current context
cluster_name = self.context.get("context").get("cluster")
user_name = self.context.get("context").get("user")
namespace = self.context.get("namespace", "default")
roles = self.get_context_user_roles()
roles_str = ", ".join(roles) if roles else "No associated Roles"
report = f"""
This report is being generated using the Kubernetes configuration below:
Kubernetes Cluster: {Fore.YELLOW}[{cluster_name}]{Style.RESET_ALL} User: {Fore.YELLOW}[{user_name}]{Style.RESET_ALL} Namespace: {Fore.YELLOW}[{namespace}]{Style.RESET_ALL} Roles: {Fore.YELLOW}[{roles_str}]{Style.RESET_ALL}
"""
print(report)
@@ -358,6 +376,25 @@ Azure Identity Type: {Fore.YELLOW}[{audit_info.identity.identity_type}]{Style.RE
return gcp_audit_info
def set_kubernetes_audit_info(self, arguments) -> Kubernetes_Audit_Info:
"""
set_kubernetes_audit_info returns the Kubernetes_Audit_Info
"""
logger.info("Setting Kubernetes session ...")
kubeconfig_file = arguments.get("kubeconfig_file")
logger.info("Checking if any context is set ...")
context = arguments.get("context")
kubernetes_provider = Kubernetes_Provider(kubeconfig_file, context)
(
kubernetes_audit_info.api_client,
kubernetes_audit_info.context,
) = kubernetes_provider.get_credentials()
return kubernetes_audit_info
def set_provider_audit_info(provider: str, arguments: dict):
"""
+17
View File
@@ -111,6 +111,23 @@ class Gcp_Output_Options(Provider_Output_Options):
self.output_filename = arguments.output_filename
class Kubernetes_Output_Options(Provider_Output_Options):
def __init__(self, arguments, audit_info, mutelist_file, bulk_checks_metadata):
# First call Provider_Output_Options init
super().__init__(arguments, mutelist_file, bulk_checks_metadata)
# TODO move the below if to Provider_Output_Options
# Check if custom output filename was input, if not, set the default
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
self.output_filename = (
f"prowler-output-{audit_info.context['name']}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename
class Aws_Output_Options(Provider_Output_Options):
security_hub_enabled: bool
@@ -0,0 +1,90 @@
import os
import sys
from kubernetes import client, config
from prowler.lib.logger import logger
class Kubernetes_Provider:
def __init__(
self,
kubeconfig_file: str,
context: str,
):
logger.info("Instantiating Kubernetes Provider ...")
self.api_client, self.context = self.__set_credentials__(
kubeconfig_file, context
)
if not self.api_client:
logger.critical("Failed to set up a Kubernetes session.")
sys.exit(1)
def __set_credentials__(self, kubeconfig_file, context):
try:
if kubeconfig_file:
# Use kubeconfig file if provided
config.load_kube_config(
config_file=os.path.abspath(kubeconfig_file), context=context
)
else:
# Otherwise try to load in-cluster config
config.load_incluster_config()
context = config.list_kube_config_contexts()[0][0]
return client.ApiClient(), context
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def get_credentials(self):
return self.api_client, self.context
def search_and_save_roles(
self, roles: list, role_bindings, context_user: str, role_binding_type: str
):
try:
for rb in role_bindings:
if rb.subjects:
for subject in rb.subjects:
if subject.kind == "User" and subject.name == context_user:
if role_binding_type == "ClusterRole":
roles.append(f"{role_binding_type}: {rb.role_ref.name}")
elif role_binding_type == "Role":
roles.append(
f"{role_binding_type} ({rb.metadata.namespace}): {rb.role_ref.name}"
)
return roles
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def get_context_user_roles(self):
try:
rbac_api = client.RbacAuthorizationV1Api()
context_user = self.context.get("context", {}).get("user", "")
roles = []
# Search in ClusterRoleBindings
roles = self.search_and_save_roles(
roles,
rbac_api.list_cluster_role_binding().items,
context_user,
"ClusterRole",
)
# Search in RoleBindings for all namespaces
roles = self.search_and_save_roles(
roles,
rbac_api.list_role_binding_for_all_namespaces().items,
context_user,
"Role",
)
return roles
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
@@ -0,0 +1,113 @@
import os
import sys
from typing import Any, Optional
from colorama import Fore, Style
from kubernetes import client, config
from prowler.lib.logger import logger
from prowler.providers.common.provider import CloudProvider
class KubernetesProvider(CloudProvider):
# TODO change class name from CloudProvider to Provider
api_client: Any
context: dict
audit_resources: Optional[Any]
audit_metadata: Optional[Any]
audit_config: Optional[dict]
def __init__(self, arguments: dict):
logger.info("Instantiating Kubernetes Provider ...")
self.api_client, self.context = self.setup_session(
arguments.kubeconfig_file, arguments.context
)
if not self.api_client:
logger.critical("Failed to set up a Kubernetes session.")
sys.exit(1)
if not arguments.only_logs:
self.print_credentials()
def setup_session(self, kubeconfig_file, context):
try:
if kubeconfig_file:
# Use kubeconfig file if provided
config.load_kube_config(
config_file=os.path.abspath(kubeconfig_file), context=context
)
else:
# Otherwise try to load in-cluster config
config.load_incluster_config()
context = config.list_kube_config_contexts()[0][0]
return client.ApiClient(), context
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def search_and_save_roles(
self, roles: list, role_bindings, context_user: str, role_binding_type: str
):
try:
for rb in role_bindings:
if rb.subjects:
for subject in rb.subjects:
if subject.kind == "User" and subject.name == context_user:
if role_binding_type == "ClusterRole":
roles.append(f"{role_binding_type}: {rb.role_ref.name}")
elif role_binding_type == "Role":
roles.append(
f"{role_binding_type} ({rb.metadata.namespace}): {rb.role_ref.name}"
)
return roles
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def get_context_user_roles(self):
try:
rbac_api = client.RbacAuthorizationV1Api()
context_user = self.context.get("context", {}).get("user", "")
roles = []
# Search in ClusterRoleBindings
roles = self.search_and_save_roles(
roles,
rbac_api.list_cluster_role_binding().items,
context_user,
"ClusterRole",
)
# Search in RoleBindings for all namespaces
roles = self.search_and_save_roles(
roles,
rbac_api.list_role_binding_for_all_namespaces().items,
context_user,
"Role",
)
return roles
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def print_credentials(self):
# Get the current context
cluster_name = self.context.get("context").get("cluster")
user_name = self.context.get("context").get("user")
namespace = self.context.get("namespace", "default")
roles = self.get_context_user_roles()
roles_str = ", ".join(roles) if roles else "No associated Roles"
report = f"""
This report is being generated using the Kubernetes configuration below:
Kubernetes Cluster: {Fore.YELLOW}[{cluster_name}]{Style.RESET_ALL} User: {Fore.YELLOW}[{user_name}]{Style.RESET_ALL} Namespace: {Fore.YELLOW}[{namespace}]{Style.RESET_ALL} Roles: {Fore.YELLOW}[{roles_str}]{Style.RESET_ALL}
"""
print(report)
@@ -0,0 +1,21 @@
def init_parser(self):
"""Init the Kubernetes Provider CLI parser"""
k8s_parser = self.subparsers.add_parser(
"kubernetes", parents=[self.common_providers_parser], help="Kubernetes Provider"
)
# Authentication and Configuration
k8s_auth_subparser = k8s_parser.add_argument_group(
"Authentication and Configuration"
)
k8s_auth_subparser.add_argument(
"--kubeconfig-file",
nargs="?",
metavar="FILE_PATH",
help="Path to the kubeconfig file to use for CLI requests. Not necessary for in-cluster execution.",
)
k8s_auth_subparser.add_argument(
"--context",
nargs="?",
metavar="CONTEXT_NAME",
help="The name of the kubeconfig context to use. By default, current_context from config file will be used.",
)
@@ -0,0 +1,9 @@
from prowler.providers.kubernetes.lib.audit_info.models import Kubernetes_Audit_Info
kubernetes_audit_info = Kubernetes_Audit_Info(
api_client=None,
context=None,
audit_resources=None,
audit_metadata=None,
audit_config=None,
)
@@ -0,0 +1,27 @@
from dataclasses import dataclass
from typing import Any, Optional
from kubernetes import client
@dataclass
class Kubernetes_Audit_Info:
api_client: client.ApiClient
context: Optional[str]
audit_resources: Optional[Any]
audit_metadata: Optional[Any]
audit_config: Optional[dict]
def __init__(
self,
api_client,
context,
audit_metadata,
audit_resources,
audit_config,
):
self.api_client = api_client
self.context = context
self.audit_metadata = audit_metadata
self.audit_resources = audit_resources
self.audit_config = audit_config
@@ -0,0 +1,40 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider_new import KubernetesProvider
MAX_WORKERS = 10
class KubernetesService:
def __init__(self, provider: KubernetesProvider):
self.context = provider.context
self.api_client = provider.api_client
# Thread pool for __threading_call__
self.thread_pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)
def __threading_call__(self, call, iterator):
items = iterator
# Determine the total count for logging
item_count = len(items)
# Trim leading and trailing underscores from the call's name
call_name = call.__name__.strip("_")
# Add Capitalization
call_name = " ".join([x.capitalize() for x in call_name.split("_")])
logger.info(
f"{self.service.upper()} - Starting threads for '{call_name}' function to process {item_count} items..."
)
# Submit tasks to the thread pool
futures = [self.thread_pool.submit(call, item) for item in items]
# Wait for all tasks to complete
for future in as_completed(futures):
try:
future.result() # Raises exceptions from the thread, if any
except Exception:
# Handle exceptions if necessary
pass # Replace 'pass' with any additional exception handling logic. Currently handled within the called function
@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "apiserver_anonymous_requests",
"CheckTitle": "Ensure that the --anonymous-auth argument is set to false",
"CheckType": [
"Cluster Security",
"Authentication and Authorization"
],
"ServiceName": "apiserver",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "KubernetesAPIServer",
"Description": "Disable anonymous requests to the API server. When enabled, requests that are not rejected by other configured authentication methods are treated as anonymous requests, which are then served by the API server. Disallowing anonymous requests strengthens security by ensuring all access is authenticated.",
"Risk": "Enabling anonymous access to the API server can expose the cluster to unauthorized access and potential security vulnerabilities.",
"RelatedUrl": "https://kubernetes.io/docs/admin/authentication/#anonymous-requests",
"Remediation": {
"Code": {
"CLI": "Edit the API server pod specification file /etc/kubernetes/manifests/kube-apiserver.yaml and set --anonymous-auth=false",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure the --anonymous-auth argument in the API server is set to false. This will reject all anonymous requests, enforcing authenticated access to the server.",
"Url": "https://kubernetes.io/docs/reference/command-line-tools-reference/kube-apiserver/"
}
},
"Categories": [
"Security Best Practices",
"Compliance"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "While anonymous access can be useful for health checks and discovery, consider the security implications for your specific environment."
}
@@ -0,0 +1,23 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.apiserver.apiserver_client import (
apiserver_client,
)
class apiserver_anonymous_requests(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
for pod in apiserver_client.apiserver_pods:
report = Check_Report_Kubernetes(self.metadata())
report.namespace = pod.namespace
report.resource_name = pod.name
report.resource_id = pod.uid
report.status = "PASS"
report.status_extended = "API Server does not have anonymous-auth enabled."
for container in pod.containers.values():
if "--anonymous-auth=true" in container.command:
report.resource_id = container.name
report.status = "FAIL"
report.status_extended = f"API Server has anonymous-auth enabled in container {container.name}."
findings.append(report)
return findings
@@ -0,0 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.kubernetes.services.apiserver.apiserver_service import APIServer
apiserver_client = APIServer(global_provider)
@@ -0,0 +1,26 @@
from prowler.lib.logger import logger
from prowler.providers.kubernetes.lib.service.service import KubernetesService
from prowler.providers.kubernetes.services.core.core_client import core_client
################## APIServer ##################
class APIServer(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
self.client = core_client
self.apiserver_pods = self.__get_apiserver_pod__()
def __get_apiserver_pod__(self):
try:
apiserver_pods = []
for pod in self.client.pods.values():
if pod.namespace == "kube-system" and pod.name.startswith(
"kube-apiserver"
):
apiserver_pods.append(pod)
return apiserver_pods
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@@ -0,0 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.kubernetes.services.core.core_service import Core
core_client = Core(global_provider)
@@ -0,0 +1,80 @@
from typing import List, Optional
from kubernetes import client
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.kubernetes.lib.service.service import KubernetesService
################## Core ##################
class Core(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
self.client = client.CoreV1Api(self.api_client)
self.pods = {}
self.__get_pods__()
def __get_pods__(self):
try:
pods = self.client.list_pod_for_all_namespaces()
for pod in pods.items:
pod_containers = {}
for container in pod.spec.containers:
pod_containers[container.name] = Container(
name=container.name,
image=container.image,
command=container.command if container.command else None,
ports=[
{"containerPort": port.container_port}
for port in container.ports
]
if container.ports
else None,
env=[
{"name": env.name, "value": env.value}
for env in container.env
]
if container.env
else None,
)
self.pods[pod.metadata.uid] = Pod(
name=pod.metadata.name,
uid=pod.metadata.uid,
namespace=pod.metadata.namespace,
labels=pod.metadata.labels,
annotations=pod.metadata.annotations,
node_name=pod.spec.node_name,
service_account=pod.spec.service_account_name,
status_phase=pod.status.phase,
pod_ip=pod.status.pod_ip,
host_ip=pod.status.host_ip,
containers=pod_containers,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Container(BaseModel):
name: str
image: str
command: Optional[List[str]]
ports: Optional[List[dict]]
env: Optional[List[dict]]
class Pod(BaseModel):
name: str
uid: str
namespace: str
labels: Optional[dict]
annotations: Optional[dict]
node_name: Optional[str]
service_account: Optional[str]
status_phase: Optional[str]
pod_ip: Optional[str]
host_ip: Optional[str]
containers: Optional[dict]
@@ -0,0 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.kubernetes.services.scheduler.scheduler_service import Scheduler
scheduler_client = Scheduler(global_provider)
@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "scheduler_profiling",
"CheckTitle": "Ensure that the --profiling argument is set to false",
"CheckType": [
"Cluster Performance",
"Cluster Security"
],
"ServiceName": "kube-scheduler",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "KubernetesScheduler",
"Description": "Disable profiling in the Kubernetes Scheduler unless it is needed for troubleshooting. Profiling can reveal detailed system and application performance data, which might be exploited if exposed. Turning off profiling reduces the potential attack surface and performance overhead.",
"Risk": "While profiling is useful for identifying performance issues, it generates detailed data that could potentially expose sensitive information about the system and its performance characteristics.",
"RelatedUrl": "https://github.com/kubernetes/community/blob/master/contributors/devel/profiling.md",
"Remediation": {
"Code": {
"CLI": "Edit the Scheduler pod specification file /etc/kubernetes/manifests/kube-scheduler.yaml and set --profiling=false",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "To minimize exposure to performance data and potential vulnerabilities, ensure the --profiling argument in the Kubernetes Scheduler is set to false.",
"Url": "https://kubernetes.io/docs/admin/kube-scheduler/"
}
},
"Categories": [
"Performance Optimization",
"Security Best Practices"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "By default, profiling is enabled in Kubernetes Scheduler. Disabling it is a good security practice if profiling data is not needed for regular operations."
}
@@ -0,0 +1,23 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.scheduler.scheduler_client import (
scheduler_client,
)
class scheduler_profiling(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
for pod in scheduler_client.scheduler_pods:
report = Check_Report_Kubernetes(self.metadata())
report.namespace = pod.namespace
report.resource_name = pod.name
report.resource_id = pod.uid
report.status = "PASS"
report.status_extended = "Scheduler does not have profiling enabled."
for container in pod.containers.values():
if "--profiling=true" in container.command:
report.resource_id = container.name
report.status = "FAIL"
report.status_extended = f"Scheduler has profiling enabled in container {container.name}."
findings.append(report)
return findings
@@ -0,0 +1,26 @@
from prowler.lib.logger import logger
from prowler.providers.kubernetes.lib.service.service import KubernetesService
from prowler.providers.kubernetes.services.core.core_client import core_client
################## Scheduler ##################
class Scheduler(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
self.client = core_client
self.scheduler_pods = self.__get_scheduler_pod__()
def __get_scheduler_pod__(self):
try:
scheduler_pods = []
for pod in self.client.pods.values():
if pod.namespace == "kube-system" and pod.name.startswith(
"kube-scheduler"
):
scheduler_pods.append(pod)
return scheduler_pods
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
+1
View File
@@ -41,6 +41,7 @@ detect-secrets = "1.4.0"
google-api-python-client = "2.108.0"
google-auth-httplib2 = "^0.1.0"
jsonschema = "4.18.0"
kubernetes = "^28.1.0"
mkdocs = {version = "1.5.3", optional = true}
mkdocs-material = {version = "9.4.10", optional = true}
msgraph-core = "0.2.2"
+8 -1
View File
@@ -54,7 +54,7 @@ config_aws = {
class Test_Config:
def test_get_aws_available_regions(self):
assert len(get_aws_available_regions()) == 32
assert len(get_aws_available_regions()) == 33
@mock.patch(
"prowler.config.config.requests.get", new=mock_prowler_get_latest_release
@@ -179,6 +179,13 @@ class Test_Config:
assert load_and_validate_config_file(provider, config_test_file) is None
def test_load_and_validate_config_file_kubernetes(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config.yaml"
provider = "kubernetes"
assert load_and_validate_config_file(provider, config_test_file) is None
def test_load_and_validate_config_file_azure(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config.yaml"
+3
View File
@@ -59,3 +59,6 @@ azure:
# GCP Configuration
gcp:
# Kubernetes Configuration
kubernetes:
@@ -21,6 +21,7 @@ CUSTOM_CHECKS_METADATA_FIXTURE_FILE_NOT_VALID = f"{os.path.dirname(os.path.realp
AWS_PROVIDER = "aws"
AZURE_PROVIDER = "azure"
GCP_PROVIDER = "gcp"
KUBERNETES_PROVIDER = "kubernetes"
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_NAME = "s3_bucket_level_public_access_block"
S3_BUCKET_LEVEL_PUBLIC_ACCESS_BLOCK_SEVERITY = "medium"
@@ -81,6 +82,11 @@ class TestCustomChecksMetadata:
GCP_PROVIDER, CUSTOM_CHECKS_METADATA_FIXTURE_FILE
) == {"Checks": {"bigquery_dataset_cmk_encryption": {"Severity": "low"}}}
def test_parse_custom_checks_metadata_file_for_kubernetes(self):
assert parse_custom_checks_metadata_file(
KUBERNETES_PROVIDER, CUSTOM_CHECKS_METADATA_FIXTURE_FILE
) == {"Checks": {"bigquery_dataset_cmk_encryption": {"Severity": "low"}}}
def test_parse_custom_checks_metadata_file_for_aws_validation_error(self, caplog):
caplog.set_level(logging.CRITICAL)
@@ -13,3 +13,7 @@ CustomChecksMetadata:
Checks:
bigquery_dataset_cmk_encryption:
Severity: low
kubernetes:
Checks:
apiserver_anonymous_requests:
Severity: low
+58 -4
View File
@@ -11,11 +11,11 @@ prowler_command = "prowler"
# capsys
# https://docs.pytest.org/en/7.1.x/how-to/capture-stdout-stderr.html
prowler_default_usage_error = "usage: prowler [-h] [-v] {aws,azure,gcp} ..."
prowler_default_usage_error = "usage: prowler [-h] [-v] {aws,azure,gcp,kubernetes} ..."
def mock_get_available_providers():
return ["aws", "azure", "gcp"]
return ["aws", "azure", "gcp", "kubernetes"]
class Test_Parser:
@@ -153,6 +153,41 @@ class Test_Parser:
assert not parsed.list_categories
assert not parsed.credentials_file
def test_default_parser_no_arguments_kubernetes(self):
provider = "kubernetes"
command = [prowler_command, provider]
parsed = self.parser.parse(command)
assert parsed.provider == provider
assert not parsed.quiet
assert len(parsed.output_modes) == 4
assert "csv" in parsed.output_modes
assert "html" in parsed.output_modes
assert "json" in parsed.output_modes
assert not parsed.output_filename
assert "output" in parsed.output_directory
assert not parsed.verbose
assert not parsed.no_banner
assert not parsed.slack
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.checks
assert not parsed.checks_file
assert not parsed.checks_folder
assert not parsed.services
assert not parsed.severity
assert not parsed.compliance
assert len(parsed.categories) == 0
assert not parsed.excluded_checks
assert not parsed.excluded_services
assert not parsed.list_checks
assert not parsed.list_services
assert not parsed.list_compliance
assert not parsed.list_compliance_requirements
assert not parsed.list_categories
assert not parsed.credentials_file
def test_root_parser_version_short(self):
command = [prowler_command, "-v"]
with pytest.raises(SystemExit) as wrapped_exit:
@@ -194,15 +229,18 @@ class Test_Parser:
def test_root_parser_azure_provider(self):
command = [prowler_command, "azure"]
parsed = self.parser.parse(command)
print(parsed)
assert parsed.provider == "azure"
def test_root_parser_gcp_provider(self):
command = [prowler_command, "gcp"]
parsed = self.parser.parse(command)
print(parsed)
assert parsed.provider == "gcp"
def test_root_parser_kubernetes_provider(self):
command = [prowler_command, "kubernetes"]
parsed = self.parser.parse(command)
assert parsed.provider == "kubernetes"
def test_root_parser_quiet_short(self):
command = [prowler_command, "-q"]
parsed = self.parser.parse(command)
@@ -1096,6 +1134,22 @@ class Test_Parser:
assert parsed.project_ids[0] == project_1
assert parsed.project_ids[1] == project_2
def test_parser_kubernetes_auth_kubeconfig_file(self):
argument = "--kubeconfig-file"
file = "config"
command = [prowler_command, "kubernetes", argument, file]
parsed = self.parser.parse(command)
assert parsed.provider == "kubernetes"
assert parsed.kubeconfig_file == file
def test_parser_kubernetes_auth_context(self):
argument = "--context"
context = "default"
command = [prowler_command, "kubernetes", argument, context]
parsed = self.parser.parse(command)
assert parsed.provider == "kubernetes"
assert parsed.context == context
def test_validate_azure_region_valid_regions(self):
expected_regions = [
"AzureChinaCloud",
+34
View File
@@ -22,6 +22,8 @@ from prowler.providers.common.audit_info import (
from prowler.providers.common.models import Audit_Metadata
from prowler.providers.gcp.gcp_provider import GCP_Provider
from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
from prowler.providers.kubernetes.kubernetes_provider import Kubernetes_Provider
from prowler.providers.kubernetes.lib.audit_info.models import Kubernetes_Audit_Info
EXAMPLE_AMI_ID = "ami-12c6146b"
AWS_ACCOUNT_NUMBER = "123456789012"
@@ -93,6 +95,14 @@ def mock_get_project_ids(*_):
return ["project"]
def mock_set_kubernetes_credentials(*_):
return ("apiclient", "context")
def mock_get_context_user_roles(*_):
return []
class Test_Set_Audit_Info:
# Mocked Audit Info
def set_mocked_audit_info(self):
@@ -278,6 +288,30 @@ class Test_Set_Audit_Info:
audit_info = set_provider_audit_info(provider, arguments)
assert isinstance(audit_info, GCP_Audit_Info)
@patch.object(
Kubernetes_Provider, "__set_credentials__", new=mock_set_kubernetes_credentials
)
@patch.object(
Kubernetes_Provider, "get_context_user_roles", new=mock_get_context_user_roles
)
def test_set_audit_info_kubernetes(self):
provider = "kubernetes"
arguments = {
"profile": None,
"role": None,
"session_duration": None,
"external_id": None,
"regions": None,
"organizations_role": None,
"subscriptions": None,
"context": "default",
"kubeconfig_file": "config",
"config_file": default_config_file_path,
}
audit_info = set_provider_audit_info(provider, arguments)
assert isinstance(audit_info, Kubernetes_Audit_Info)
@mock_resourcegroupstaggingapi
@mock_ec2
def test_get_tagged_resources(self):
+91 -1
View File
@@ -16,10 +16,12 @@ from prowler.providers.common.outputs import (
Aws_Output_Options,
Azure_Output_Options,
Gcp_Output_Options,
Kubernetes_Output_Options,
get_provider_output_model,
set_provider_output_options,
)
from prowler.providers.gcp.lib.audit_info.models import GCP_Audit_Info
from prowler.providers.kubernetes.lib.audit_info.models import Kubernetes_Audit_Info
AWS_ACCOUNT_NUMBER = "012345678912"
DATETIME = "20230101120000"
@@ -51,6 +53,20 @@ class Test_Common_Output_Options:
)
return audit_info
# Mocked Kusbernete Audit Info
def set_mocked_kubernetes_audit_info(self):
audit_info = Kubernetes_Audit_Info(
api_client=None,
context={
"name": "test-context",
"context": {"cluster": "test-cluster", "user": "XXXXXXXXX"},
},
audit_resources=None,
audit_metadata=None,
audit_config=None,
)
return audit_info
# Mocked AWS Audit Info
def set_mocked_aws_audit_info(self):
audit_info = AWS_Audit_Info(
@@ -147,6 +163,37 @@ class Test_Common_Output_Options:
# Delete testing directory
rmdir(arguments.output_directory)
def test_set_provider_output_options_kubernetes(self):
# Set the cloud provider
provider = "kubernetes"
# Set the arguments passed
arguments = Namespace()
arguments.quiet = True
arguments.output_modes = ["html", "csv", "json"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.output_filename = "output_test_filename"
arguments.only_logs = False
arguments.unix_timestamp = False
audit_info = self.set_mocked_kubernetes_audit_info()
mutelist_file = ""
bulk_checks_metadata = {}
output_options = set_provider_output_options(
provider, arguments, audit_info, mutelist_file, bulk_checks_metadata
)
assert isinstance(output_options, Kubernetes_Output_Options)
assert output_options.is_quiet
assert output_options.output_modes == ["html", "csv", "json"]
assert output_options.output_directory == arguments.output_directory
assert output_options.mutelist_file == ""
assert output_options.bulk_checks_metadata == {}
assert output_options.verbose
assert output_options.output_filename == arguments.output_filename
# Delete testing directory
rmdir(arguments.output_directory)
def test_set_provider_output_options_aws_no_output_filename(self):
# Set the cloud provider
provider = "aws"
@@ -362,7 +409,7 @@ class Test_Common_Output_Options:
)
def test_gcp_get_assessment_summary(self):
# Mock Azure Audit Info
# Mock GCP Audit Info
audit_info = self.set_mocked_gcp_audit_info()
profile = "default"
assert (
@@ -395,11 +442,54 @@ class Test_Common_Output_Options:
"""
)
def test_kubernetes_get_assessment_summary(self):
# Mock Kubernetes Audit Info
audit_info = self.set_mocked_kubernetes_audit_info()
assert (
get_assessment_summary(audit_info)
== """
<div class="col-md-2">
<div class="card">
<div class="card-header">
Kubernetes Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Kubernetes Context:</b> """
+ audit_info.context["name"]
+ """
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Kubernetes Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Kubernetes Cluster:</b> """
+ audit_info.context["context"]["cluster"]
+ """
</li>
<li class="list-group-item">
<b>Kubernetes User:</b> """
+ audit_info.context["context"]["user"]
+ """
</li>
</ul>
</div>
</div>
"""
)
def test_get_provider_output_model(self):
audit_info_class_names = [
"AWS_Audit_Info",
"GCP_Audit_Info",
"Azure_Audit_Info",
"Kubernetes_Audit_Info",
]
for class_name in audit_info_class_names:
provider_prefix = class_name.split("_", 1)[0].lower().capitalize()