feat(kubernetes): checks for memory limits, memory requests, and image tag (#11373)

Co-authored-by: Hugo P.Brito <hugopbrit@gmail.com>
This commit is contained in:
Nikhil Kumar
2026-06-23 00:09:39 +05:30
committed by GitHub
parent 04e6e330a7
commit b6caaa4268
31 changed files with 2163 additions and 38 deletions
+1
View File
@@ -50,6 +50,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `transfer_server_pqc_ssh_kex_enabled` check for AWS provider to verify Transfer Family servers use a post-quantum hybrid SSH key exchange security policy [(#11315)](https://github.com/prowler-cloud/prowler/pull/11315)
- `acmpca_certificate_authority_pqc_key_algorithm` check and new `acmpca` service for AWS provider to verify AWS Private CA certificate authorities use a post-quantum (ML-DSA) key algorithm [(#11318)](https://github.com/prowler-cloud/prowler/pull/11318)
- `rolesanywhere_trust_anchor_pqc_pki` check and new `rolesanywhere` service for AWS provider to verify IAM Roles Anywhere trust anchors are backed by a post-quantum (ML-DSA) PKI [(#11319)](https://github.com/prowler-cloud/prowler/pull/11319)
- Kubernetes core checks for container CPU limits, CPU requests, memory limits, memory requests, fixed image tags, liveness probes, and readiness probes [(#11373)](https://github.com/prowler-cloud/prowler/pull/11373)
### 🔄 Changed
@@ -0,0 +1,35 @@
{
"Provider": "kubernetes",
"CheckID": "core_cpu_limits_set",
"CheckTitle": "Pod containers have CPU limits set",
"CheckType": [],
"ServiceName": "core",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "KubernetesPod",
"Description": "Ensure CPU limits are set for containers to prevent noisy neighbors and resource exhaustion.",
"Risk": "Missing CPU limits can allow containers to consume unbounded CPU leading to performance issues.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/"
],
"Remediation": {
"Code": {
"CLI": "kubectl set resources deployment/<name> -n <namespace> --containers=<container> --limits=cpu=500m",
"NativeIaC": "# Example: set cpu limits in container resources\nresources:\n limits:\n cpu: \"500m\"",
"Other": "1. Edit the Pod/Deployment manifest and set `resources.limits.cpu` for each container.",
"Terraform": ""
},
"Recommendation": {
"Text": "Define CPU limits to bound a container's CPU usage and protect node stability.",
"Url": "https://hub.prowler.com/check/core_cpu_limits_set"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Limits should be tuned per workload and monitored."
}
@@ -0,0 +1,35 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.core.core_client import core_client
class core_cpu_limits_set(Check):
"""Check whether regular pod containers have CPU limits configured."""
def execute(self) -> list[Check_Report_Kubernetes]:
"""Execute the Kubernetes pod CPU limits check.
Returns:
List of check reports for Kubernetes pods.
"""
findings = []
for pod in core_client.pods.values():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=pod)
report.status = "PASS"
report.status_extended = (
f"Pod {pod.name} regular containers have CPU limits configured."
)
for container in (pod.containers or {}).values():
resources = container.resources or {}
limits = (
resources.get("limits") if isinstance(resources, dict) else None
)
cpu = limits.get("cpu") if limits and isinstance(limits, dict) else None
if not cpu:
report.status = "FAIL"
report.status_extended = f"Pod {pod.name} container {container.name} does not have a CPU limit configured."
break
findings.append(report)
return findings
@@ -0,0 +1,35 @@
{
"Provider": "kubernetes",
"CheckID": "core_cpu_requests_set",
"CheckTitle": "Pod containers have CPU requests set",
"CheckType": [],
"ServiceName": "core",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "KubernetesPod",
"Description": "Ensure CPU requests are set for containers to enable proper scheduling and resource guarantees.",
"Risk": "Missing CPU requests can lead to scheduling and resource contention issues.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/"
],
"Remediation": {
"Code": {
"CLI": "kubectl set resources deployment/<name> -n <namespace> --containers=<container> --requests=cpu=100m",
"NativeIaC": "# Example: set cpu requests in container resources\nresources:\n requests:\n cpu: \"100m\"",
"Other": "1. Edit the Pod/Deployment manifest and set `resources.requests.cpu` for each container.",
"Terraform": ""
},
"Recommendation": {
"Text": "Define sensible CPU requests to enable efficient scheduling and fair resource allocation.",
"Url": "https://hub.prowler.com/check/core_cpu_requests_set"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Requests should be tuned per workload and cluster capacity."
}
@@ -0,0 +1,39 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.core.core_client import core_client
class core_cpu_requests_set(Check):
"""Check whether regular pod containers have CPU requests configured."""
def execute(self) -> list[Check_Report_Kubernetes]:
"""Execute the Kubernetes pod CPU requests check.
Returns:
List of check reports for Kubernetes pods.
"""
findings = []
for pod in core_client.pods.values():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=pod)
report.status = "PASS"
report.status_extended = (
f"Pod {pod.name} regular containers have CPU requests configured."
)
for container in (pod.containers or {}).values():
resources = container.resources or {}
requests = (
resources.get("requests") if isinstance(resources, dict) else None
)
cpu = (
requests.get("cpu")
if requests and isinstance(requests, dict)
else None
)
if not cpu:
report.status = "FAIL"
report.status_extended = f"Pod {pod.name} container {container.name} does not have a CPU request configured."
break
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "kubernetes",
"CheckID": "core_image_tag_fixed",
"CheckTitle": "Container images use fixed tags",
"CheckType": [],
"ServiceName": "core",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Pod",
"ResourceGroup": "container",
"Description": "**Kubernetes Pods** are evaluated for containers using images without a **fixed version tag**, such as `latest` or no tag at all, which results in unpredictable image versions being pulled.",
"Risk": "Using **`latest` or untagged images** breaks reproducibility and introduces risk:\n- Deployments may pull different image versions across nodes (integrity)\n- Rollbacks become unreliable when the exact image is unknown\n- Supply-chain attacks can inject malicious code via mutable tags\nCompromised or unexpected images can lead to **data breaches**, **lateral movement**, and **service disruption**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://kubernetes.io/docs/concepts/containers/images/",
"https://kubernetes.io/docs/concepts/configuration/overview/#container-images"
],
"Remediation": {
"Code": {
"CLI": "kubectl set image deployment/<workload-name> <container-name>=<image>:<fixed-version> -n <namespace>\n# Example: kubectl set image deployment/nginx nginx=nginx:1.25.3 -n default\n# For maximum immutability, use an image digest instead: <image>@sha256:<digest>",
"NativeIaC": "",
"Other": "1. Open your Kubernetes UI (e.g., cloud provider console or Dashboard)\n2. Navigate to the failing workload (Deployment/StatefulSet/DaemonSet) or Pod and choose Edit YAML/Manifest\n3. Update the image field for each affected container to include a specific version tag or digest:\n - Example: change `nginx` or `nginx:latest` to `nginx:1.25.3` or `nginx@sha256:<digest>`\n4. Save changes (controllers will roll out new Pods). If it is a standalone Pod, delete and recreate it to apply the change",
"Terraform": "```hcl\nresource \"kubernetes_pod\" \"<example_resource_name>\" {\n metadata { name = \"<example_resource_name>\" }\n spec {\n container {\n name = \"<example_resource_name>\"\n image = \"nginx:1.25.3\" # Critical: use a fixed version tag, never 'latest' or untagged\n }\n }\n}\n```"
},
"Recommendation": {
"Text": "Pin all container images to a **specific version tag** or **digest** (`@sha256:...`) to ensure reproducible, auditable deployments.\n\n- Avoid `latest` and untagged images in production\n- Use **image digests** for maximum immutability\n- Enforce tag policies with **admission controllers** (e.g., OPA Gatekeeper, Kyverno)\n- Integrate image scanning into CI/CD pipelines for **defense in depth**",
"Url": "https://hub.prowler.com/check/core_image_tag_fixed"
}
},
"Categories": [
"container-security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Using fixed image tags ensures reproducible deployments and reduces supply-chain attack surface."
}
@@ -0,0 +1,50 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.core.core_client import core_client
def _has_fixed_image_tag(image: str) -> bool:
if "@" in image:
return True
image_name = image.rsplit("/", 1)[-1]
if ":" not in image_name:
return False
tag = image_name.rsplit(":", 1)[-1]
return bool(tag) and tag.lower() != "latest"
class core_image_tag_fixed(Check):
"""Ensure that image tag is not set to latest or blank."""
def execute(self) -> list[Check_Report_Kubernetes]:
"""Execute the check logic.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for pod in core_client.pods.values():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=pod)
report.status = "PASS"
report.status_extended = (
f"Pod {pod.name} has fixed image tags on all containers."
)
for containers in (
pod.containers,
pod.init_containers,
pod.ephemeral_containers,
):
for container in (containers or {}).values():
image = container.image
if not _has_fixed_image_tag(image):
report.status = "FAIL"
report.status_extended = f"Pod {pod.name} has container {container.name} with image '{image}' that does not use a fixed tag."
break
if report.status == "FAIL":
break
findings.append(report)
return findings
@@ -0,0 +1,36 @@
{
"Provider": "kubernetes",
"CheckID": "core_liveness_probe_configured",
"CheckTitle": "Pod containers have liveness probes configured",
"CheckType": [],
"ServiceName": "core",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "KubernetesPod",
"Description": "Ensure each regular pod container has a liveness probe configured to detect and restart unhealthy containers.",
"Risk": "Without liveness probes, failed containers may remain running causing degraded service or resource waste.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/",
"https://sre.google/workbook/"
],
"Remediation": {
"Code": {
"CLI": "kubectl patch deployment/<name> -n <namespace> --type='json' -p='[{\"op\":\"add\",\"path\":\"/spec/template/spec/containers/0/livenessProbe\",\"value\":{\"httpGet\":{\"path\":\"/healthz\",\"port\":8080},\"initialDelaySeconds\":10,\"periodSeconds\":10}}]'",
"NativeIaC": "# Example: add a livenessProbe to your container spec\nlivenessProbe:\n httpGet:\n path: /healthz\n port: 8080\n initialDelaySeconds: 10\n periodSeconds: 10",
"Other": "1. Edit the Pod/Deployment manifest and add a `livenessProbe` to each container.\n2. Tune `initialDelaySeconds`, `periodSeconds` and failure thresholds to your app.",
"Terraform": ""
},
"Recommendation": {
"Text": "Add and tune liveness probes for containers to allow Kubernetes to detect and restart unhealthy containers.",
"Url": "https://hub.prowler.com/check/core_liveness_probe_configured"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Presence of a probe is checked; adjust thresholds for app behavior to avoid false positives."
}
@@ -0,0 +1,28 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.core.core_client import core_client
class core_liveness_probe_configured(Check):
"""Check whether regular pod containers have liveness probes configured."""
def execute(self) -> list[Check_Report_Kubernetes]:
"""Execute the Kubernetes pod liveness probe check.
Returns:
List of check reports for Kubernetes pods.
"""
findings = []
for pod in core_client.pods.values():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=pod)
report.status = "PASS"
report.status_extended = f"Pod {pod.name} has liveness probes configured for all regular containers."
for container in (pod.containers or {}).values():
if not container.liveness_probe:
report.status = "FAIL"
report.status_extended = f"Pod {pod.name} container {container.name} does not have a liveness probe configured."
break
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "kubernetes",
"CheckID": "core_memory_limits_set",
"CheckTitle": "Container memory limits are configured",
"CheckType": [],
"ServiceName": "core",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Pod",
"ResourceGroup": "container",
"Description": "**Kubernetes Pods** are evaluated for containers without **memory limits** configured in `resources.limits.memory`, indicating unbounded memory consumption is permitted.",
"Risk": "Without **memory limits**, a single container can consume all available node memory, causing:\n- **OOM kills** of other workloads (availability)\n- Node instability and cascading failures\n- Noisy-neighbor problems in multi-tenant clusters\nAttackers exploiting a vulnerability can amplify impact by exhausting node resources, enabling **denial of service** across co-located workloads.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/",
"https://kubernetes.io/docs/tasks/administer-cluster/manage-resources/memory-default-namespace/"
],
"Remediation": {
"Code": {
"CLI": "kubectl set resources deployment/<workload-name> -c <container-name> --limits=memory=<memory-limit> -n <namespace>\n# Example: kubectl set resources deployment/nginx -c nginx --limits=memory=128Mi -n default",
"NativeIaC": "",
"Other": "1. Open your Kubernetes UI (e.g., cloud provider console or Dashboard)\n2. Navigate to the failing workload (Deployment/StatefulSet/DaemonSet) or Pod and choose Edit YAML/Manifest\n3. Set resources.limits.memory for each affected container:\n - For controllers: spec.template.spec.containers[].resources.limits.memory\n - For standalone Pods: spec.containers[].resources.limits.memory\n4. Save changes (controllers will roll out new Pods). If it is a standalone Pod, delete and recreate it to apply the change",
"Terraform": "```hcl\nresource \"kubernetes_pod\" \"<example_resource_name>\" {\n metadata { name = \"<example_resource_name>\" }\n spec {\n container {\n name = \"<example_resource_name>\"\n image = \"<IMAGE>\"\n resources {\n limits = {\n memory = \"128Mi\" # Critical: sets a memory ceiling to prevent unbounded consumption\n }\n }\n }\n }\n}\n```"
},
"Recommendation": {
"Text": "Set **memory limits** on every container to prevent unbounded consumption and protect node stability.\n\n- Start with observed usage plus headroom; tune with **VPA** recommendations\n- Combine with **LimitRanges** and **ResourceQuotas** at the namespace level for **defense in depth**\n- Monitor memory usage and OOMKill events to right-size limits over time",
"Url": "https://hub.prowler.com/check/core_memory_limits_set"
}
},
"Categories": [
"container-security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Memory limits prevent containers from consuming unbounded memory and protect node stability."
}
@@ -0,0 +1,39 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.core.core_client import core_client
class core_memory_limits_set(Check):
"""Ensure that memory limits are set on all containers."""
def execute(self) -> list[Check_Report_Kubernetes]:
"""Execute the check logic.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for pod in core_client.pods.values():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=pod)
report.status = "PASS"
report.status_extended = (
f"Pod {pod.name} has memory limits set on all containers."
)
for container in (pod.containers or {}).values():
resources = container.resources or {}
limits = (
resources.get("limits") if isinstance(resources, dict) else None
)
memory = (
limits.get("memory")
if limits and isinstance(limits, dict)
else None
)
if not memory:
report.status = "FAIL"
report.status_extended = f"Pod {pod.name} does not have memory limits set on container {container.name}."
break
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "kubernetes",
"CheckID": "core_memory_requests_set",
"CheckTitle": "Container memory requests are configured",
"CheckType": [],
"ServiceName": "core",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Pod",
"ResourceGroup": "container",
"Description": "**Kubernetes Pods** are evaluated for containers without **memory requests** configured in `resources.requests.memory`, indicating the scheduler cannot guarantee memory allocation.",
"Risk": "Without **memory requests**, the scheduler cannot make informed placement decisions, leading to:\n- Overcommitted nodes that trigger **OOM kills** (availability)\n- Unpredictable performance under load\n- Inability to enforce **ResourceQuotas** effectively\nMissing requests weaken cluster stability and make capacity planning unreliable, increasing blast radius during incidents.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/",
"https://kubernetes.io/docs/tasks/administer-cluster/manage-resources/memory-default-namespace/"
],
"Remediation": {
"Code": {
"CLI": "kubectl set resources deployment/<workload-name> -c <container-name> --requests=memory=<memory-request> -n <namespace>\n# Example: kubectl set resources deployment/nginx -c nginx --requests=memory=64Mi -n default",
"NativeIaC": "",
"Other": "1. Open your Kubernetes UI (e.g., cloud provider console or Dashboard)\n2. Navigate to the failing workload (Deployment/StatefulSet/DaemonSet) or Pod and choose Edit YAML/Manifest\n3. Set resources.requests.memory for each affected container:\n - For controllers: spec.template.spec.containers[].resources.requests.memory\n - For standalone Pods: spec.containers[].resources.requests.memory\n4. Save changes (controllers will roll out new Pods). If it is a standalone Pod, delete and recreate it to apply the change",
"Terraform": "```hcl\nresource \"kubernetes_pod\" \"<example_resource_name>\" {\n metadata { name = \"<example_resource_name>\" }\n spec {\n container {\n name = \"<example_resource_name>\"\n image = \"<IMAGE>\"\n resources {\n requests = {\n memory = \"64Mi\" # Critical: guarantees memory for the scheduler to make informed placement\n }\n }\n }\n }\n}\n```"
},
"Recommendation": {
"Text": "Set **memory requests** on every container so the scheduler can guarantee memory allocation and place pods effectively.\n\n- Base requests on observed steady-state usage; use **VPA** for data-driven sizing\n- Enforce minimum requests via **LimitRanges** at the namespace level\n- Combine with **ResourceQuotas** for **defense in depth** against overcommitment",
"Url": "https://hub.prowler.com/check/core_memory_requests_set"
}
},
"Categories": [
"container-security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Memory requests enable the scheduler to guarantee memory allocation and prevent overcommitment."
}
@@ -0,0 +1,39 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.core.core_client import core_client
class core_memory_requests_set(Check):
"""Ensure that memory requests are set on all containers."""
def execute(self) -> list[Check_Report_Kubernetes]:
"""Execute the check logic.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for pod in core_client.pods.values():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=pod)
report.status = "PASS"
report.status_extended = (
f"Pod {pod.name} has memory requests set on all containers."
)
for container in (pod.containers or {}).values():
resources = container.resources or {}
requests = (
resources.get("requests") if isinstance(resources, dict) else None
)
memory = (
requests.get("memory")
if requests and isinstance(requests, dict)
else None
)
if not memory:
report.status = "FAIL"
report.status_extended = f"Pod {pod.name} does not have memory requests set on container {container.name}."
break
findings.append(report)
return findings
@@ -0,0 +1,35 @@
{
"Provider": "kubernetes",
"CheckID": "core_readiness_probe_configured",
"CheckTitle": "Regular pod containers have readiness probes configured",
"CheckType": [],
"ServiceName": "core",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "KubernetesPod",
"Description": "Ensure each regular pod container has a readiness probe configured to signal when it is ready to serve traffic.",
"Risk": "Without readiness probes, services may receive traffic before containers are ready, causing errors.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/"
],
"Remediation": {
"Code": {
"CLI": "kubectl patch deployment/<name> -n <namespace> --type='json' -p='[{\"op\":\"add\",\"path\":\"/spec/template/spec/containers/0/readinessProbe\",\"value\":{\"httpGet\":{\"path\":\"/ready\",\"port\":8080},\"initialDelaySeconds\":5,\"periodSeconds\":5}}]'",
"NativeIaC": "# Example readiness probe\nreadinessProbe:\n httpGet:\n path: /ready\n port: 8080\n initialDelaySeconds: 5\n periodSeconds: 5",
"Other": "1. Add a `readinessProbe` to the container spec in Deployments/Pods.\n2. Ensure the probe accurately reflects service readiness.",
"Terraform": ""
},
"Recommendation": {
"Text": "Add readiness probes to prevent routing traffic to unready containers.",
"Url": "https://hub.prowler.com/check/core_readiness_probe_configured"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Readiness and liveness probes serve different purposes; implement both where appropriate."
}
@@ -0,0 +1,28 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.core.core_client import core_client
class core_readiness_probe_configured(Check):
"""Check whether regular pod containers have readiness probes configured."""
def execute(self) -> list[Check_Report_Kubernetes]:
"""Execute the Kubernetes pod readiness probe check.
Returns:
List of check reports for Kubernetes pods.
"""
findings = []
for pod in core_client.pods.values():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=pod)
report.status = "PASS"
report.status_extended = f"Pod {pod.name} has readiness probes configured for all regular containers."
for container in (pod.containers or {}).values():
if not container.readiness_probe:
report.status = "FAIL"
report.status_extended = f"Pod {pod.name} container {container.name} does not have a readiness probe configured."
break
findings.append(report)
return findings
@@ -27,45 +27,11 @@ class Core(KubernetesService):
for namespace in self.namespaces:
pods = self.client.list_namespaced_pod(namespace)
for pod in pods.items:
pod_containers = {}
containers = pod.spec.containers if pod.spec.containers else []
init_containers = (
pod.spec.init_containers if pod.spec.init_containers else []
)
ephemeral_containers = (
containers = self._build_containers(pod.spec.containers)
init_containers = self._build_containers(pod.spec.init_containers)
ephemeral_containers = self._build_containers(
pod.spec.ephemeral_containers
if pod.spec.ephemeral_containers
else []
)
for container in (
containers + init_containers + ephemeral_containers
):
pod_containers[container.name] = Container(
name=container.name,
image=container.image,
command=container.command if container.command else None,
ports=(
[
{"containerPort": port.container_port}
for port in container.ports
]
if container.ports
else None
),
env=(
[
{"name": env.name, "value": env.value}
for env in container.env
]
if container.env
else None
),
security_context=(
container.security_context.to_dict()
if container.security_context
else {}
),
)
self.pods[pod.metadata.uid] = Pod(
name=pod.metadata.name,
uid=pod.metadata.uid,
@@ -85,13 +51,56 @@ class Core(KubernetesService):
if pod.spec.security_context
else {}
),
containers=pod_containers,
containers=containers,
init_containers=init_containers,
ephemeral_containers=ephemeral_containers,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@staticmethod
def _build_containers(containers) -> dict:
pod_containers = {}
for container in containers or []:
pod_containers[container.name] = Container(
name=container.name,
image=container.image,
command=container.command if container.command else None,
ports=(
[{"containerPort": port.container_port} for port in container.ports]
if container.ports
else None
),
env=(
[{"name": env.name, "value": env.value} for env in container.env]
if container.env
else None
),
security_context=(
container.security_context.to_dict()
if container.security_context
else {}
),
resources=(
container.resources.to_dict()
if getattr(container, "resources", None)
else None
),
liveness_probe=(
container.liveness_probe.to_dict()
if getattr(container, "liveness_probe", None)
else None
),
readiness_probe=(
container.readiness_probe.to_dict()
if getattr(container, "readiness_probe", None)
else None
),
)
return pod_containers
def _list_config_maps(self):
try:
response = self.client.list_config_map_for_all_namespaces()
@@ -156,6 +165,9 @@ class Container(BaseModel):
ports: Optional[List[dict]]
env: Optional[List[dict]]
security_context: dict
resources: Optional[dict] = None
liveness_probe: Optional[dict] = None
readiness_probe: Optional[dict] = None
class Pod(BaseModel):
@@ -174,6 +186,8 @@ class Pod(BaseModel):
host_network: Optional[bool]
security_context: Optional[dict]
containers: Optional[dict]
init_containers: Optional[dict] = None
ephemeral_containers: Optional[dict] = None
class ConfigMap(BaseModel):
@@ -0,0 +1,74 @@
import importlib
from unittest import mock
from prowler.providers.kubernetes.services.core.core_service import Container, Pod
from tests.providers.kubernetes.kubernetes_fixtures import (
set_mocked_kubernetes_provider,
)
def make_container(
name="app",
image="nginx:1.25",
resources=None,
liveness_probe=None,
readiness_probe=None,
):
return Container(
name=name,
image=image,
command=None,
ports=None,
env=None,
security_context={},
resources=resources,
liveness_probe=liveness_probe,
readiness_probe=readiness_probe,
)
def make_pod(
containers=None,
init_containers=None,
ephemeral_containers=None,
name="test-pod",
uid="test-pod-uid",
):
return Pod(
name=name,
uid=uid,
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=False,
host_ipc=False,
host_network=False,
security_context={},
containers=containers or {},
init_containers=init_containers or {},
ephemeral_containers=ephemeral_containers or {},
)
def make_core_client(pods):
core_client = mock.MagicMock()
core_client.pods = pods
return core_client
def run_check(module_path, class_name, core_client):
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(f"{module_path}.core_client", new=core_client),
):
check_module = importlib.import_module(module_path)
check = getattr(check_module, class_name)()
return check.execute()
@@ -0,0 +1,82 @@
from tests.providers.kubernetes.services.core.conftest import (
make_container,
make_core_client,
make_pod,
run_check,
)
MODULE = (
"prowler.providers.kubernetes.services.core.core_cpu_limits_set.core_cpu_limits_set"
)
CLASS = "core_cpu_limits_set"
class TestCoreCpuLimitsSet:
def test_no_resources(self):
result = run_check(MODULE, CLASS, make_core_client({}))
assert len(result) == 0
def test_cpu_limit_set_pass(self):
pod = make_pod(
containers={"app": make_container(resources={"limits": {"cpu": "500m"}})}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Pod test-pod regular containers have CPU limits configured."
)
def test_cpu_limit_missing_fail(self):
pod = make_pod(containers={"app": make_container(resources=None)})
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container app does not have a CPU limit configured."
)
def test_empty_cpu_limit_fail(self):
pod = make_pod(
containers={"app": make_container(resources={"limits": {"cpu": ""}})}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
def test_mixed_regular_containers_fail(self):
pod = make_pod(
containers={
"app": make_container(
name="app", resources={"limits": {"cpu": "500m"}}
),
"sidecar": make_container(name="sidecar", resources={"limits": {}}),
}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container sidecar does not have a CPU limit configured."
)
def test_init_and_ephemeral_containers_ignored(self):
pod = make_pod(
containers={"app": make_container(resources={"limits": {"cpu": "500m"}})},
init_containers={"init": make_container(name="init", resources=None)},
ephemeral_containers={
"debug": make_container(name="debug", resources=None)
},
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "PASS"
@@ -0,0 +1,80 @@
from tests.providers.kubernetes.services.core.conftest import (
make_container,
make_core_client,
make_pod,
run_check,
)
MODULE = "prowler.providers.kubernetes.services.core.core_cpu_requests_set.core_cpu_requests_set"
CLASS = "core_cpu_requests_set"
class TestCoreCpuRequestsSet:
def test_no_resources(self):
result = run_check(MODULE, CLASS, make_core_client({}))
assert len(result) == 0
def test_cpu_request_set_pass(self):
pod = make_pod(
containers={"app": make_container(resources={"requests": {"cpu": "100m"}})}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Pod test-pod regular containers have CPU requests configured."
)
def test_cpu_request_missing_fail(self):
pod = make_pod(containers={"app": make_container(resources={})})
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container app does not have a CPU request configured."
)
def test_empty_cpu_request_fail(self):
pod = make_pod(
containers={"app": make_container(resources={"requests": {"cpu": ""}})}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
def test_mixed_regular_containers_fail(self):
pod = make_pod(
containers={
"app": make_container(
name="app", resources={"requests": {"cpu": "100m"}}
),
"sidecar": make_container(name="sidecar", resources=None),
}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container sidecar does not have a CPU request configured."
)
def test_init_and_ephemeral_containers_ignored(self):
pod = make_pod(
containers={"app": make_container(resources={"requests": {"cpu": "100m"}})},
init_containers={"init": make_container(name="init", resources=None)},
ephemeral_containers={
"debug": make_container(name="debug", resources=None)
},
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "PASS"
@@ -0,0 +1,572 @@
from unittest import mock
from prowler.providers.kubernetes.services.core.core_service import Container, Pod
from tests.providers.kubernetes.kubernetes_fixtures import (
set_mocked_kubernetes_provider,
)
from tests.providers.kubernetes.services.core.conftest import (
make_container,
make_core_client,
make_pod,
run_check,
)
MODULE = "prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed"
CLASS = "core_image_tag_fixed"
class Test_core_image_tag_fixed:
def test_no_pods(self):
core_client = mock.MagicMock()
core_client.pods = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 0
def test_image_tag_fixed(self):
container = Container(
name="test-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
"Pod test-pod has fixed image tags on all containers."
)
assert result[0].resource_id == "test-uid-1234"
assert result[0].resource_name == "test-pod"
def test_pod_without_containers(self):
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers=None,
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
"Pod test-pod has fixed image tags on all containers."
)
def test_image_tag_latest(self):
container = Container(
name="test-container",
image="nginx:latest",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "does not use a fixed tag" in result[0].status_extended
def test_image_tag_blank(self):
container = Container(
name="test-container",
image="nginx",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "does not use a fixed tag" in result[0].status_extended
def test_image_with_digest(self):
container = Container(
name="test-container",
image="nginx@sha256:abc123def456",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
def test_image_with_registry_port_and_no_tag(self):
container = Container(
name="test-container",
image="localhost:5000/nginx",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "does not use a fixed tag" in result[0].status_extended
def test_image_tag_uppercase_latest(self):
container = Container(
name="test-container",
image="nginx:Latest",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "does not use a fixed tag" in result[0].status_extended
def test_image_with_empty_tag(self):
container = Container(
name="test-container",
image="nginx:",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "does not use a fixed tag" in result[0].status_extended
def test_mixed_containers_fails_on_first_unfixed_image(self):
fixed_container = Container(
name="fixed-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
latest_container = Container(
name="latest-container",
image="busybox:latest",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={
"fixed-container": fixed_container,
"latest-container": latest_container,
},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_image_tag_fixed.core_image_tag_fixed import (
core_image_tag_fixed,
)
check = core_image_tag_fixed()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"Pod test-pod has container latest-container with image 'busybox:latest' that does not use a fixed tag."
)
def test_init_container_image_tag_latest(self):
pod = make_pod(
containers={"app": make_container(image="nginx:1.25.3")},
init_containers={
"init": make_container(name="init", image="busybox:latest")
},
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"Pod test-pod has container init with image 'busybox:latest' that does not use a fixed tag."
)
def test_ephemeral_container_image_tag_blank(self):
pod = make_pod(
containers={"app": make_container(image="nginx:1.25.3")},
ephemeral_containers={
"debug": make_container(name="debug", image="busybox")
},
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"Pod test-pod has container debug with image 'busybox' that does not use a fixed tag."
)
def test_init_and_ephemeral_image_tags_fixed_without_regular_containers(self):
pod = make_pod(
containers=None,
init_containers={"init": make_container(name="init", image="busybox:1.36")},
ephemeral_containers={
"debug": make_container(name="debug", image="debug@sha256:abc123")
},
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "PASS"
assert result[0].status_extended == (
"Pod test-pod has fixed image tags on all containers."
)
@@ -0,0 +1,83 @@
from tests.providers.kubernetes.services.core.conftest import (
make_container,
make_core_client,
make_pod,
run_check,
)
MODULE = "prowler.providers.kubernetes.services.core.core_liveness_probe_configured.core_liveness_probe_configured"
CLASS = "core_liveness_probe_configured"
class TestCoreLivenessProbeConfigured:
def test_no_resources(self):
result = run_check(MODULE, CLASS, make_core_client({}))
assert len(result) == 0
def test_liveness_probe_configured_pass(self):
pod = make_pod(
containers={"app": make_container(liveness_probe={"http_get": {}})}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Pod test-pod has liveness probes configured for all regular containers."
)
def test_liveness_probe_missing_fail(self):
pod = make_pod(containers={"app": make_container(liveness_probe=None)})
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container app does not have a liveness probe configured."
)
def test_empty_liveness_probe_fail(self):
pod = make_pod(containers={"app": make_container(liveness_probe={})})
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container app does not have a liveness probe configured."
)
def test_mixed_regular_containers_fail(self):
pod = make_pod(
containers={
"app": make_container(name="app", liveness_probe={"http_get": {}}),
"sidecar": make_container(name="sidecar", liveness_probe=None),
}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container sidecar does not have a liveness probe configured."
)
def test_init_and_ephemeral_containers_ignored(self):
pod = make_pod(
containers={"app": make_container(liveness_probe={"http_get": {}})},
init_containers={"init": make_container(name="init", liveness_probe=None)},
ephemeral_containers={
"debug": make_container(name="debug", liveness_probe=None)
},
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "PASS"
@@ -0,0 +1,313 @@
from unittest import mock
from prowler.providers.kubernetes.services.core.core_service import Container, Pod
from tests.providers.kubernetes.kubernetes_fixtures import (
set_mocked_kubernetes_provider,
)
class Test_core_memory_limits_set:
def test_no_pods(self):
core_client = mock.MagicMock()
core_client.pods = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set import (
core_memory_limits_set,
)
check = core_memory_limits_set()
result = check.execute()
assert len(result) == 0
def test_memory_limits_set(self):
container = Container(
name="test-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources={
"limits": {"memory": "128Mi", "cpu": "500m"},
"requests": {"memory": "64Mi", "cpu": "250m"},
},
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set import (
core_memory_limits_set,
)
check = core_memory_limits_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
"Pod test-pod has memory limits set on all containers."
)
assert result[0].resource_id == "test-uid-1234"
assert result[0].resource_name == "test-pod"
def test_memory_limits_set_with_no_containers(self):
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers=None,
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set import (
core_memory_limits_set,
)
check = core_memory_limits_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
"Pod test-pod has memory limits set on all containers."
)
def test_memory_limits_not_set(self):
container = Container(
name="test-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set import (
core_memory_limits_set,
)
check = core_memory_limits_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"Pod test-pod does not have memory limits set on container test-container."
)
def test_memory_limits_missing_memory_key(self):
container = Container(
name="test-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources={
"limits": {"cpu": "500m"},
"requests": {"memory": "64Mi"},
},
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set import (
core_memory_limits_set,
)
check = core_memory_limits_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
def test_mixed_containers_fails_on_missing_memory_limit(self):
limited_container = Container(
name="limited-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources={"limits": {"memory": "128Mi"}},
)
unlimited_container = Container(
name="unlimited-container",
image="busybox:1.36",
command=None,
ports=None,
env=None,
security_context={},
resources={"requests": {"memory": "64Mi"}},
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={
"limited-container": limited_container,
"unlimited-container": unlimited_container,
},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_limits_set.core_memory_limits_set import (
core_memory_limits_set,
)
check = core_memory_limits_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"Pod test-pod does not have memory limits set on container unlimited-container."
)
@@ -0,0 +1,313 @@
from unittest import mock
from prowler.providers.kubernetes.services.core.core_service import Container, Pod
from tests.providers.kubernetes.kubernetes_fixtures import (
set_mocked_kubernetes_provider,
)
class Test_core_memory_requests_set:
def test_no_pods(self):
core_client = mock.MagicMock()
core_client.pods = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set import (
core_memory_requests_set,
)
check = core_memory_requests_set()
result = check.execute()
assert len(result) == 0
def test_memory_requests_set(self):
container = Container(
name="test-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources={
"limits": {"memory": "128Mi", "cpu": "500m"},
"requests": {"memory": "64Mi", "cpu": "250m"},
},
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set import (
core_memory_requests_set,
)
check = core_memory_requests_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
"Pod test-pod has memory requests set on all containers."
)
assert result[0].resource_id == "test-uid-1234"
assert result[0].resource_name == "test-pod"
def test_memory_requests_set_with_no_containers(self):
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers=None,
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set import (
core_memory_requests_set,
)
check = core_memory_requests_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
"Pod test-pod has memory requests set on all containers."
)
def test_memory_requests_not_set(self):
container = Container(
name="test-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources=None,
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set import (
core_memory_requests_set,
)
check = core_memory_requests_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"Pod test-pod does not have memory requests set on container test-container."
)
def test_memory_requests_missing_memory_key(self):
container = Container(
name="test-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources={
"limits": {"memory": "128Mi"},
"requests": {"cpu": "250m"},
},
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={"test-container": container},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set import (
core_memory_requests_set,
)
check = core_memory_requests_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
def test_mixed_containers_fails_on_missing_memory_request(self):
requested_container = Container(
name="requested-container",
image="nginx:1.25.3",
command=None,
ports=None,
env=None,
security_context={},
resources={"requests": {"memory": "64Mi"}},
)
unrequested_container = Container(
name="unrequested-container",
image="busybox:1.36",
command=None,
ports=None,
env=None,
security_context={},
resources={"limits": {"memory": "128Mi"}},
)
pod = Pod(
name="test-pod",
uid="test-uid-1234",
namespace="default",
labels=None,
annotations=None,
node_name=None,
service_account=None,
status_phase="Running",
pod_ip="10.0.0.1",
host_ip="192.168.1.1",
host_pid=None,
host_ipc=None,
host_network=False,
security_context={},
containers={
"requested-container": requested_container,
"unrequested-container": unrequested_container,
},
)
core_client = mock.MagicMock()
core_client.pods = {"test-uid-1234": pod}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_kubernetes_provider(),
),
mock.patch(
"prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set.core_client",
new=core_client,
),
):
from prowler.providers.kubernetes.services.core.core_memory_requests_set.core_memory_requests_set import (
core_memory_requests_set,
)
check = core_memory_requests_set()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
"Pod test-pod does not have memory requests set on container unrequested-container."
)
@@ -0,0 +1,83 @@
from tests.providers.kubernetes.services.core.conftest import (
make_container,
make_core_client,
make_pod,
run_check,
)
MODULE = "prowler.providers.kubernetes.services.core.core_readiness_probe_configured.core_readiness_probe_configured"
CLASS = "core_readiness_probe_configured"
class TestCoreReadinessProbeConfigured:
def test_no_resources(self):
result = run_check(MODULE, CLASS, make_core_client({}))
assert len(result) == 0
def test_readiness_probe_configured_pass(self):
pod = make_pod(
containers={"app": make_container(readiness_probe={"http_get": {}})}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Pod test-pod has readiness probes configured for all regular containers."
)
def test_readiness_probe_missing_fail(self):
pod = make_pod(containers={"app": make_container(readiness_probe=None)})
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container app does not have a readiness probe configured."
)
def test_empty_readiness_probe_fail(self):
pod = make_pod(containers={"app": make_container(readiness_probe={})})
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container app does not have a readiness probe configured."
)
def test_mixed_regular_containers_fail(self):
pod = make_pod(
containers={
"app": make_container(name="app", readiness_probe={"http_get": {}}),
"sidecar": make_container(name="sidecar", readiness_probe=None),
}
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Pod test-pod container sidecar does not have a readiness probe configured."
)
def test_init_and_ephemeral_containers_ignored(self):
pod = make_pod(
containers={"app": make_container(readiness_probe={"http_get": {}})},
init_containers={"init": make_container(name="init", readiness_probe=None)},
ephemeral_containers={
"debug": make_container(name="debug", readiness_probe=None)
},
)
result = run_check(MODULE, CLASS, make_core_client({pod.uid: pod}))
assert result[0].status == "PASS"