chore(kubernetes): Working provider (#3475)

Co-authored-by: Sergio Garcia <sergargar1@gmail.com>
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Pepe Fagoaga
2024-03-01 14:10:10 +01:00
committed by GitHub
parent be55fa22fd
commit bd17d36e7f
18 changed files with 162 additions and 291 deletions
+4 -2
View File
@@ -120,7 +120,7 @@ class Gcp_Output_Options(Provider_Output_Options):
class Kubernetes_Output_Options(Provider_Output_Options):
def __init__(self, arguments, audit_info, mutelist_file, bulk_checks_metadata):
def __init__(self, arguments, identity, mutelist_file, bulk_checks_metadata):
# First call Provider_Output_Options init
super().__init__(arguments, mutelist_file, bulk_checks_metadata)
# TODO move the below if to Provider_Output_Options
@@ -129,7 +129,9 @@ class Kubernetes_Output_Options(Provider_Output_Options):
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
self.output_filename = f"prowler-output-{audit_info.context['name'].replace(':', '_').replace('/', '_')}-{output_file_timestamp}"
self.output_filename = (
f"prowler-output-{identity.active_context}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename
@@ -1,53 +1,83 @@
import os
import sys
from argparse import Namespace
from dataclasses import dataclass
from typing import Any, Optional
from colorama import Fore, Style
from kubernetes import client, config
from prowler.lib.logger import logger
from prowler.providers.common.provider import Provider
class Kubernetes_Provider:
def __init__(self, kubeconfig_file: str, context: str, namespaces: str):
@dataclass
class KubernetesIdentityInfo:
active_context: str
class KubernetesProvider(Provider):
provider = "kubernetes"
api_client: Any
context: dict
namespaces: list
audit_resources: Optional[Any]
audit_metadata: Optional[Any]
audit_config: Optional[dict]
identity: KubernetesIdentityInfo
def __init__(self, arguments: Namespace):
"""
Initializes the KubernetesProvider instance.
Args:
arguments (dict): A dictionary containing configuration arguments.
"""
logger.info("Instantiating Kubernetes Provider ...")
self.api_client, self.context = self.__set_credentials__(
kubeconfig_file, context
self.api_client, self.context = self.setup_session(
arguments.kubeconfig_file, arguments.context
)
if not self.api_client:
logger.critical("Failed to set up a Kubernetes session.")
sys.exit(1)
if not namespaces:
if not arguments.namespaces:
logger.info("Retrieving all namespaces ...")
self.namespaces = self.get_all_namespaces()
else:
self.namespaces = namespaces
self.namespaces = arguments.namespaces
def __set_credentials__(self, kubeconfig_file, input_context):
if not self.api_client:
logger.critical("Failed to set up a Kubernetes session.")
sys.exit(1)
self.identity = KubernetesIdentityInfo(
active_context=self.context["name"].replace(":", "_").replace("/", "_")
)
if not arguments.only_logs:
self.print_credentials()
def setup_session(self, kubeconfig_file, input_context):
"""
Set up credentials for Kubernetes provider.
Sets up the Kubernetes session.
:param kubeconfig_file: Path to kubeconfig file.
:param input_context: Context to be used for Kubernetes session.
:return: Tuple containing ApiClient and context information.
Args:
kubeconfig_file (str): Path to the kubeconfig file.
input_context (str): Context name.
Returns:
Tuple: A tuple containing the API client and the context.
"""
try:
if kubeconfig_file:
# Use kubeconfig file if provided
logger.info(f"Loading kubeconfig from file: {kubeconfig_file} ...")
logger.info(f"Using kubeconfig file: {kubeconfig_file}")
config.load_kube_config(
config_file=os.path.abspath(kubeconfig_file), context=input_context
)
# Set context if input in argument
if input_context:
contexts = config.list_kube_config_contexts()[0]
for context_item in contexts:
if context_item["name"] == input_context:
context = context_item
else:
# Get active context
context = config.list_kube_config_contexts()[1]
else:
# Otherwise try to load in-cluster config
logger.info("Loading in-cluster config ...")
logger.info("Using in-cluster config")
config.load_incluster_config()
context = {
"name": "In-Cluster",
@@ -63,43 +93,20 @@ class Kubernetes_Provider:
)
sys.exit(1)
def get_credentials(self):
"""
Get Kubernetes API client and context.
:return: Tuple containing ApiClient and context information.
"""
return self.api_client, self.context
def get_all_namespaces(self):
"""
Retrieve a list of all namespaces from a Kubernetes cluster.
:return: List of namespaces.
"""
try:
v1 = client.CoreV1Api()
namespace_list = v1.list_namespace(timeout_seconds=2, _request_timeout=2)
namespaces = [item.metadata.name for item in namespace_list.items]
logger.info("All namespaces retrieved successfully.")
return namespaces
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit()
def search_and_save_roles(
self, roles: list, role_bindings, context_user: str, role_binding_type: str
):
"""
Search and save roles based on role bindings and context user.
Searches for and saves roles.
:param roles: List of roles to save.
:param role_bindings: List of role bindings to search.
:param context_user: Context user to match.
:param role_binding_type: Type of role binding.
:return: Updated list of roles.
Args:
roles (list): A list to save the roles.
role_bindings: Role bindings.
context_user (str): Context user.
role_binding_type (str): Role binding type.
Returns:
list: A list containing the roles.
"""
try:
for rb in role_bindings:
@@ -121,9 +128,10 @@ class Kubernetes_Provider:
def get_context_user_roles(self):
"""
Get roles assigned to the context user.
Retrieves the context user roles.
:return: List of roles assigned to the context user.
Returns:
list: A list containing the context user roles.
"""
try:
rbac_api = client.RbacAuthorizationV1Api()
@@ -151,3 +159,62 @@ class Kubernetes_Provider:
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def get_all_namespaces(self):
"""
Retrieves all namespaces.
Returns:
list: A list containing all namespace names.
"""
try:
v1 = client.CoreV1Api()
namespace_list = v1.list_namespace(timeout_seconds=2, _request_timeout=2)
namespaces = [item.metadata.name for item in namespace_list.items]
logger.info("All namespaces retrieved successfully.")
return namespaces
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit()
def get_pod_current_namespace(self):
"""
Retrieves the current namespace from the pod's mounted service account info.
Returns:
str: The current namespace.
"""
try:
with open(
"/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r"
) as f:
return f.read().strip()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return "default"
def print_credentials(self):
"""
Prints the Kubernetes credentials.
"""
if self.context.get("name") == "In-Cluster":
report = f"""
This report is being generated using the Kubernetes configuration below:
Kubernetes Pod: {Fore.YELLOW}[prowler]{Style.RESET_ALL} Namespace: {Fore.YELLOW}[{self.get_pod_current_namespace()}]{Style.RESET_ALL}
"""
print(report)
else:
cluster_name = self.context.get("context").get("cluster")
user_name = self.context.get("context").get("user")
roles = self.get_context_user_roles()
roles_str = ", ".join(roles) if roles else "No associated Roles"
report = f"""
This report is being generated using the Kubernetes configuration below:
Kubernetes Cluster: {Fore.YELLOW}[{cluster_name}]{Style.RESET_ALL} User: {Fore.YELLOW}[{user_name}]{Style.RESET_ALL} Namespaces: {Fore.YELLOW}[{', '.join(self.namespaces)}]{Style.RESET_ALL} Roles: {Fore.YELLOW}[{roles_str}]{Style.RESET_ALL}
"""
print(report)
@@ -1,205 +0,0 @@
import os
import sys
from argparse import Namespace
from typing import Any, Optional
from colorama import Fore, Style
from kubernetes import client, config
from prowler.lib.logger import logger
from prowler.providers.common.provider import Provider
class KubernetesProvider(Provider):
api_client: Any
context: dict
namespaces: list
audit_resources: Optional[Any]
audit_metadata: Optional[Any]
audit_config: Optional[dict]
def __init__(self, arguments: Namespace):
"""
Initializes the KubernetesProvider instance.
Args:
arguments (dict): A dictionary containing configuration arguments.
"""
logger.info("Instantiating Kubernetes Provider ...")
self.api_client, self.context = self.setup_session(
arguments.kubeconfig_file, arguments.context
)
if not arguments.namespaces:
logger.info("Retrieving all namespaces ...")
self.namespaces = self.get_all_namespaces()
else:
self.namespaces = arguments.namespaces
if not self.api_client:
logger.critical("Failed to set up a Kubernetes session.")
sys.exit(1)
if not arguments.only_logs:
self.print_credentials()
def setup_session(self, kubeconfig_file, input_context):
"""
Sets up the Kubernetes session.
Args:
kubeconfig_file (str): Path to the kubeconfig file.
input_context (str): Context name.
Returns:
Tuple: A tuple containing the API client and the context.
"""
try:
if kubeconfig_file:
logger.info(f"Using kubeconfig file: {kubeconfig_file}")
config.load_kube_config(
config_file=os.path.abspath(kubeconfig_file), context=input_context
)
if input_context:
contexts = config.list_kube_config_contexts()[0]
for context_item in contexts:
if context_item["name"] == input_context:
context = context_item
else:
context = config.list_kube_config_contexts()[1]
else:
logger.info("Using in-cluster config")
config.load_incluster_config()
context = {
"name": "In-Cluster",
"context": {
"cluster": "in-cluster", # Placeholder, as the real cluster name is not available
"user": "service-account-name", # Also a placeholder
},
}
return client.ApiClient(), context
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def search_and_save_roles(
self, roles: list, role_bindings, context_user: str, role_binding_type: str
):
"""
Searches for and saves roles.
Args:
roles (list): A list to save the roles.
role_bindings: Role bindings.
context_user (str): Context user.
role_binding_type (str): Role binding type.
Returns:
list: A list containing the roles.
"""
try:
for rb in role_bindings:
if rb.subjects:
for subject in rb.subjects:
if subject.kind == "User" and subject.name == context_user:
if role_binding_type == "ClusterRole":
roles.append(f"{role_binding_type}: {rb.role_ref.name}")
elif role_binding_type == "Role":
roles.append(
f"{role_binding_type} ({rb.metadata.namespace}): {rb.role_ref.name}"
)
return roles
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def get_context_user_roles(self):
"""
Retrieves the context user roles.
Returns:
list: A list containing the context user roles.
"""
try:
rbac_api = client.RbacAuthorizationV1Api()
context_user = self.context.get("context", {}).get("user", "")
roles = []
roles = self.search_and_save_roles(
roles,
rbac_api.list_cluster_role_binding().items,
context_user,
"ClusterRole",
)
roles = self.search_and_save_roles(
roles,
rbac_api.list_role_binding_for_all_namespaces().items,
context_user,
"Role",
)
return roles
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
def get_all_namespaces(self):
"""
Retrieves all namespaces.
Returns:
list: A list containing all namespace names.
"""
try:
v1 = client.CoreV1Api()
namespace_list = v1.list_namespace(timeout_seconds=2, _request_timeout=2)
namespaces = [item.metadata.name for item in namespace_list.items]
logger.info("All namespaces retrieved successfully.")
return namespaces
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit()
def get_pod_current_namespace(self):
"""
Retrieves the current namespace from the pod's mounted service account info.
Returns:
str: The current namespace.
"""
try:
with open(
"/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r"
) as f:
return f.read().strip()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return "default"
def print_credentials(self):
"""
Prints the Kubernetes credentials.
"""
if self.context.get("name") == "In-Cluster":
report = f"""
This report is being generated using the Kubernetes configuration below:
Kubernetes Pod: {Fore.YELLOW}[prowler]{Style.RESET_ALL} Namespace: {Fore.YELLOW}[{self.get_pod_current_namespace()}]{Style.RESET_ALL}
"""
print(report)
else:
cluster_name = self.context.get("context").get("cluster")
user_name = self.context.get("context").get("user")
roles = self.get_context_user_roles()
roles_str = ", ".join(roles) if roles else "No associated Roles"
report = f"""
This report is being generated using the Kubernetes configuration below:
Kubernetes Cluster: {Fore.YELLOW}[{cluster_name}]{Style.RESET_ALL} User: {Fore.YELLOW}[{user_name}]{Style.RESET_ALL} Namespaces: {Fore.YELLOW}[{', '.join(self.namespaces)}]{Style.RESET_ALL} Roles: {Fore.YELLOW}[{roles_str}]{Style.RESET_ALL}
"""
print(report)
@@ -1,7 +1,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider_new import KubernetesProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
MAX_WORKERS = 10
@@ -1,4 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.common.common import get_global_provider
from prowler.providers.kubernetes.services.apiserver.apiserver_service import APIServer
apiserver_client = APIServer(global_provider)
apiserver_client = APIServer(get_global_provider())
@@ -1,12 +1,13 @@
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.kubernetes.lib.service.service import KubernetesService
from prowler.providers.kubernetes.services.core.core_client import core_client
################## APIServer ##################
class APIServer(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
def __init__(self, provider: KubernetesProvider):
super().__init__(provider)
self.client = core_client
self.apiserver_pods = self.__get_apiserver_pods__()
@@ -1,6 +1,6 @@
from prowler.providers.common.common import global_provider
from prowler.providers.common.common import get_global_provider
from prowler.providers.kubernetes.services.controllermanager.controllermanager_service import (
ControllerManager,
)
controllermanager_client = ControllerManager(global_provider)
controllermanager_client = ControllerManager(get_global_provider())
@@ -1,12 +1,13 @@
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.kubernetes.lib.service.service import KubernetesService
from prowler.providers.kubernetes.services.core.core_client import core_client
################## ControllerManager ##################
class ControllerManager(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
def __init__(self, provider: KubernetesProvider):
super().__init__(provider)
self.client = core_client
self.controllermanager_pods = self.__get_controllermanager_pods__()
@@ -1,4 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.common.common import get_global_provider
from prowler.providers.kubernetes.services.core.core_service import Core
core_client = Core(global_provider)
core_client = Core(get_global_provider())
@@ -7,15 +7,16 @@ from kubernetes.client.models import V1PodSecurityContext, V1SecurityContext
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.kubernetes.lib.service.service import KubernetesService
################## Core ##################
class Core(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
def __init__(self, provider: KubernetesProvider):
super().__init__(provider)
self.client = client.CoreV1Api(self.api_client)
self.namespaces = audit_info.namespaces
self.namespaces = provider.namespaces
self.pods = {}
self.__get_pods__()
self.config_maps = {}
@@ -1,4 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.common.common import get_global_provider
from prowler.providers.kubernetes.services.etcd.etcd_service import Etcd
etcd_client = Etcd(global_provider)
etcd_client = Etcd(get_global_provider())
@@ -1,12 +1,13 @@
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.kubernetes.lib.service.service import KubernetesService
from prowler.providers.kubernetes.services.core.core_client import core_client
################## Etcd ##################
class Etcd(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
def __init__(self, provider: KubernetesProvider):
super().__init__(provider)
self.client = core_client
self.etcd_pods = self.__get_etcd_pods__()
@@ -1,4 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.common.common import get_global_provider
from prowler.providers.kubernetes.services.kubelet.kubelet_service import Kubelet
kubelet_client = Kubelet(global_provider)
kubelet_client = Kubelet(get_global_provider())
@@ -1,14 +1,15 @@
import yaml
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.kubernetes.lib.service.service import KubernetesService
from prowler.providers.kubernetes.services.core.core_client import core_client
################## Kubelet ##################
class Kubelet(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
def __init__(self, provider: KubernetesProvider):
super().__init__(provider)
self.client = core_client
self.kubelet_config_maps = self.__get_kubelet_config_maps__()
@@ -1,4 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.common.common import get_global_provider
from prowler.providers.kubernetes.services.rbac.rbac_service import Rbac
rbac_client = Rbac(global_provider)
rbac_client = Rbac(get_global_provider())
@@ -4,13 +4,14 @@ from kubernetes import client
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.kubernetes.lib.service.service import KubernetesService
################## Rbac ##################
class Rbac(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
def __init__(self, provider: KubernetesProvider):
super().__init__(provider)
self.client = client.RbacAuthorizationV1Api()
self.cluster_role_bindings = self.__list_cluster_role_bindings__()
@@ -1,4 +1,4 @@
from prowler.providers.common.common import global_provider
from prowler.providers.common.common import get_global_provider
from prowler.providers.kubernetes.services.scheduler.scheduler_service import Scheduler
scheduler_client = Scheduler(global_provider)
scheduler_client = Scheduler(get_global_provider())
@@ -1,12 +1,13 @@
from prowler.lib.logger import logger
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.kubernetes.lib.service.service import KubernetesService
from prowler.providers.kubernetes.services.core.core_client import core_client
################## Scheduler ##################
class Scheduler(KubernetesService):
def __init__(self, audit_info):
super().__init__(audit_info)
def __init__(self, provider: KubernetesProvider):
super().__init__(provider)
self.client = core_client
self.scheduler_pods = self.__get_scheduler_pods__()