feat(api): add health/live and health/ready probe endpoints (#11200)

This commit is contained in:
Adrián Peña
2026-05-18 16:28:36 +02:00
committed by GitHub
parent 5cd7fe4f96
commit 37aa290d1c
13 changed files with 803 additions and 8 deletions
+1
View File
@@ -59,6 +59,7 @@ jobs:
github.com:443
api.github.com:443
objects.githubusercontent.com:443
raw.githubusercontent.com:443
release-assets.githubusercontent.com:443
api.osv.dev:443
api.deps.dev:443
+1
View File
@@ -7,6 +7,7 @@ All notable changes to the **Prowler API** are documented in this file.
### 🚀 Added
- GIN index on `findings(categories, resource_services, resource_regions, resource_types)` to speed up `/api/v1/finding-groups` array filters [(#11001)](https://github.com/prowler-cloud/prowler/pull/11001)
- `GET /health/live` and `GET /health/ready` Kubernetes-style probe endpoints following the IETF Health Check Response Format (`application/health+json`). Readiness verifies PostgreSQL, Valkey and Neo4j connectivity and returns 503 with per-dependency detail when any is unreachable; both endpoints centralize the API version on `config/version.py` (read from `pyproject.toml`) and are wired into the Helm charts and the Docker Compose healthcheck [(#11200)](https://github.com/prowler-cloud/prowler/pull/11200)
### 🔄 Changed
+254
View File
@@ -0,0 +1,254 @@
"""Liveness and readiness endpoints following the IETF Health Check Response
Format (draft-inadarei-api-health-check-06).
Liveness reports only process status. Readiness verifies that PostgreSQL,
Valkey and Neo4j are reachable and returns per-dependency detail when any
of them is unreachable.
"""
from __future__ import annotations
import logging
import threading
import time
from contextlib import suppress
from datetime import datetime, timezone
from typing import Any
import redis
from config.version import API_VERSION, RELEASE_ID
from django.conf import settings
from django.db import connections
from drf_spectacular.utils import extend_schema
from rest_framework import status
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.throttling import ScopedRateThrottle
from rest_framework.views import APIView
logger = logging.getLogger(__name__)
SERVICE_ID = "prowler-api"
SERVICE_DESCRIPTION = "Prowler API"
# Status vocabulary from the IETF draft (section 3.1).
STATUS_PASS = "pass"
STATUS_FAIL = "fail"
STATUS_WARN = "warn"
# Short socket timeout so a stuck Valkey cannot stall the probe.
# Neo4j inherits its driver-level ``connection_acquisition_timeout``.
VALKEY_PROBE_TIMEOUT_SECONDS = 2
# Brief cache window so high-frequency probes (ALB target groups, scrapers)
# do not stampede the actual dependency checks.
CACHE_CONTROL_HEADER = "max-age=3, must-revalidate"
# In-process readiness cache. Caps real dependency hits to roughly
# (gunicorn workers / TTL) per second regardless of incoming RPS or the
# source-IP distribution. Kept in sync with the Cache-Control max-age.
# Access is guarded by a lock so concurrent readers do not race on the
# read-decide-write cycle of the double-checked locking pattern below.
READINESS_CACHE_TTL_SECONDS = 3.0
_readiness_cache: tuple[float, dict[str, Any], int] | None = None
_readiness_cache_lock = threading.Lock()
class HealthJSONRenderer(JSONRenderer):
"""Emits responses with the ``application/health+json`` content type."""
media_type = "application/health+json"
format = "health"
def _now_iso() -> str:
return (
datetime.now(timezone.utc)
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z")
)
def _measure(name: str, check_fn) -> tuple[dict[str, Any], float]:
"""Time ``check_fn`` and return ``(result, elapsed_ms)``.
``check_fn`` returns ``None`` on success or raises on failure. The full
exception is logged for operator diagnostics under ``name``; the
response payload intentionally omits the error detail to avoid leaking
infrastructure information (DNS names, ports, credentials, certificate
chains) to anonymous clients.
"""
started = time.perf_counter()
try:
check_fn()
except Exception:
elapsed_ms = (time.perf_counter() - started) * 1000
logger.warning("Health probe '%s' failed", name, exc_info=True)
return ({"status": STATUS_FAIL}, elapsed_ms)
elapsed_ms = (time.perf_counter() - started) * 1000
return ({"status": STATUS_PASS}, elapsed_ms)
def _probe_postgres() -> None:
with connections["default"].cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
def _probe_valkey() -> None:
client = redis.Redis.from_url(
settings.CELERY_BROKER_URL,
socket_connect_timeout=VALKEY_PROBE_TIMEOUT_SECONDS,
socket_timeout=VALKEY_PROBE_TIMEOUT_SECONDS,
)
try:
if not client.ping():
raise RuntimeError("PING did not return PONG")
finally:
# Best-effort cleanup: a failure releasing the socket (e.g. broken
# connection, half-closed by the server) must not mask the probe
# result. Narrowed to the exception types redis-py and the stdlib
# socket layer can raise on close.
with suppress(redis.RedisError, OSError):
client.close()
def _probe_neo4j() -> None:
# Lazy import: avoids pulling attack_paths into the boot import graph.
from api.attack_paths.database import get_driver
get_driver().verify_connectivity()
def _build_check_entry(
component_id: str,
component_type: str,
result: dict[str, Any],
elapsed_ms: float,
) -> dict[str, Any]:
entry: dict[str, Any] = {
"componentId": component_id,
"componentType": component_type,
"observedValue": round(elapsed_ms, 2),
"observedUnit": "ms",
"status": result["status"],
"time": _now_iso(),
}
if "output" in result:
entry["output"] = result["output"]
return entry
def _aggregate_status(check_entries: list[dict[str, Any]]) -> str:
statuses = {entry["status"] for entry in check_entries}
if STATUS_FAIL in statuses:
return STATUS_FAIL
if STATUS_WARN in statuses:
return STATUS_WARN
return STATUS_PASS
def _base_payload(overall_status: str) -> dict[str, Any]:
return {
"status": overall_status,
"version": API_VERSION,
"releaseId": RELEASE_ID,
"serviceId": SERVICE_ID,
"description": SERVICE_DESCRIPTION,
}
def _readiness_payload() -> tuple[dict[str, Any], int]:
global _readiness_cache
# Lock-free fast path: a stale snapshot still satisfies the freshness
# check correctly because we re-check after acquiring the lock below.
snapshot = _readiness_cache
if (
snapshot is not None
and time.monotonic() - snapshot[0] < READINESS_CACHE_TTL_SECONDS
):
return snapshot[1], snapshot[2]
with _readiness_cache_lock:
# Double-checked locking: another thread may have refreshed while
# we were waiting on the lock.
snapshot = _readiness_cache
if (
snapshot is not None
and time.monotonic() - snapshot[0] < READINESS_CACHE_TTL_SECONDS
):
return snapshot[1], snapshot[2]
postgres_result, postgres_ms = _measure("postgres", _probe_postgres)
valkey_result, valkey_ms = _measure("valkey", _probe_valkey)
neo4j_result, neo4j_ms = _measure("neo4j", _probe_neo4j)
entries = [
_build_check_entry("postgres", "datastore", postgres_result, postgres_ms),
_build_check_entry("valkey", "datastore", valkey_result, valkey_ms),
_build_check_entry("neo4j", "datastore", neo4j_result, neo4j_ms),
]
overall = _aggregate_status(entries)
payload = _base_payload(overall)
payload["checks"] = {
"postgres:responseTime": [entries[0]],
"valkey:responseTime": [entries[1]],
"neo4j:responseTime": [entries[2]],
}
http_status = (
status.HTTP_503_SERVICE_UNAVAILABLE
if overall == STATUS_FAIL
else status.HTTP_200_OK
)
_readiness_cache = (time.monotonic(), payload, http_status)
return payload, http_status
def _health_response(payload: dict[str, Any], http_status: int) -> Response:
response = Response(payload, status=http_status)
response["Cache-Control"] = CACHE_CONTROL_HEADER
return response
@extend_schema(exclude=True)
class LivenessView(APIView):
"""Liveness probe. Always 200 when the process can serve requests.
Dependencies are intentionally not consulted: a failing liveness probe
triggers a container restart, which must not happen for transient
dependency outages. Throttled per-IP so the endpoint cannot be used as
a cheap availability oracle for the process.
"""
authentication_classes: list = []
permission_classes: list = []
renderer_classes = [HealthJSONRenderer]
throttle_classes = [ScopedRateThrottle]
throttle_scope = "health-live"
def get(self, _request, *_args, **_kwargs):
return _health_response(_base_payload(STATUS_PASS), status.HTTP_200_OK)
@extend_schema(exclude=True)
class ReadinessView(APIView):
"""Readiness probe.
Returns 200 when PostgreSQL, Valkey and Neo4j all respond, or 503 with
per-dependency detail when any of them is unreachable. Per-IP throttle
plus the short in-process result cache cap the real dependency hits
regardless of inbound traffic shape.
"""
authentication_classes: list = []
permission_classes: list = []
renderer_classes = [HealthJSONRenderer]
throttle_classes = [ScopedRateThrottle]
throttle_scope = "health-ready"
def get(self, _request, *_args, **_kwargs):
payload, http_status = _readiness_payload()
return _health_response(payload, http_status)
+445
View File
@@ -0,0 +1,445 @@
"""Tests for the health endpoints.
Cover the IETF response envelope, status code mapping (200 / 503), the
``application/health+json`` media type and per-probe failure modes.
"""
from unittest.mock import patch
import pytest
from config import version as config_version
from django.core.cache import cache
from django.urls import reverse
from rest_framework import status
from rest_framework.test import APIClient
from api import health
HEALTH_MEDIA_TYPE = "application/health+json"
@pytest.fixture(autouse=True)
def _reset_health_state():
"""Per-test isolation: clear throttle counters and the readiness cache.
DRF's ScopedRateThrottle persists state in Django's cache; without
clearing it the throttle budget would be shared across tests and trip
midway through the suite.
"""
cache.clear()
health._readiness_cache = None
yield
cache.clear()
health._readiness_cache = None
@pytest.fixture
def api_client():
return APIClient()
def _assert_health_envelope(body):
"""Every health response must carry the RFC top-level descriptors."""
assert body["version"] == config_version.API_VERSION
assert body["releaseId"] == config_version.RELEASE_ID
assert body["serviceId"] == health.SERVICE_ID
assert body["description"] == health.SERVICE_DESCRIPTION
class TestLivenessEndpoint:
def test_returns_200_with_pass_status(self, api_client):
response = api_client.get(reverse("health-live"))
assert response.status_code == status.HTTP_200_OK
assert response["Content-Type"].startswith(HEALTH_MEDIA_TYPE)
assert response["Cache-Control"] == health.CACHE_CONTROL_HEADER
body = response.json()
assert body["status"] == "pass"
_assert_health_envelope(body)
def test_does_not_require_authentication(self, api_client):
api_client.credentials()
response = api_client.get(reverse("health-live"))
assert response.status_code == status.HTTP_200_OK
def test_does_not_run_dependency_checks(self, api_client):
with (
patch("api.health._probe_postgres") as mock_pg,
patch("api.health._probe_valkey") as mock_vk,
patch("api.health._probe_neo4j") as mock_neo,
):
response = api_client.get(reverse("health-live"))
assert response.status_code == status.HTTP_200_OK
mock_pg.assert_not_called()
mock_vk.assert_not_called()
mock_neo.assert_not_called()
class TestReadinessEndpoint:
@staticmethod
def _patch_probes():
return (
patch("api.health._probe_postgres", return_value=None),
patch("api.health._probe_valkey", return_value=None),
patch("api.health._probe_neo4j", return_value=None),
)
def test_returns_200_and_pass_when_all_dependencies_healthy(self, api_client):
with (
patch("api.health._probe_postgres"),
patch("api.health._probe_valkey"),
patch("api.health._probe_neo4j"),
):
response = api_client.get(reverse("health-ready"))
assert response.status_code == status.HTTP_200_OK
assert response["Content-Type"].startswith(HEALTH_MEDIA_TYPE)
assert response["Cache-Control"] == health.CACHE_CONTROL_HEADER
body = response.json()
_assert_health_envelope(body)
assert body["status"] == "pass"
# Per RFC, `checks` values are arrays of one or more measurement
# objects. We use a single measurement per dependency.
assert set(body["checks"].keys()) == {
"postgres:responseTime",
"valkey:responseTime",
"neo4j:responseTime",
}
for key in body["checks"]:
entries = body["checks"][key]
assert isinstance(entries, list) and len(entries) == 1
entry = entries[0]
assert entry["status"] == "pass"
assert entry["componentType"] == "datastore"
assert entry["observedUnit"] == "ms"
assert isinstance(entry["observedValue"], (int, float))
assert entry["observedValue"] >= 0
assert "time" in entry
# `output` must not leak when the check passed.
assert "output" not in entry
def test_returns_503_and_fail_when_postgres_is_down(self, api_client):
with (
patch(
"api.health._probe_postgres",
side_effect=RuntimeError("connection refused"),
),
patch("api.health._probe_valkey"),
patch("api.health._probe_neo4j"),
):
response = api_client.get(reverse("health-ready"))
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
body = response.json()
assert body["status"] == "fail"
pg_entry = body["checks"]["postgres:responseTime"][0]
assert pg_entry["status"] == "fail"
# Exception detail is never echoed in the response, only logged.
assert "output" not in pg_entry
assert body["checks"]["valkey:responseTime"][0]["status"] == "pass"
assert body["checks"]["neo4j:responseTime"][0]["status"] == "pass"
def test_returns_503_and_fail_when_valkey_is_down(self, api_client):
with (
patch("api.health._probe_postgres"),
patch("api.health._probe_valkey", side_effect=ConnectionError("timeout")),
patch("api.health._probe_neo4j"),
):
response = api_client.get(reverse("health-ready"))
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
body = response.json()
assert body["status"] == "fail"
vk_entry = body["checks"]["valkey:responseTime"][0]
assert vk_entry["status"] == "fail"
assert "output" not in vk_entry
def test_returns_503_and_fail_when_neo4j_is_down(self, api_client):
with (
patch("api.health._probe_postgres"),
patch("api.health._probe_valkey"),
patch(
"api.health._probe_neo4j",
side_effect=RuntimeError("ServiceUnavailable"),
),
):
response = api_client.get(reverse("health-ready"))
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
body = response.json()
assert body["status"] == "fail"
neo_entry = body["checks"]["neo4j:responseTime"][0]
assert neo_entry["status"] == "fail"
assert "output" not in neo_entry
def test_reports_all_failures_simultaneously(self, api_client):
with (
patch("api.health._probe_postgres", side_effect=RuntimeError("pg down")),
patch("api.health._probe_valkey", side_effect=RuntimeError("vk down")),
patch("api.health._probe_neo4j", side_effect=RuntimeError("neo down")),
):
response = api_client.get(reverse("health-ready"))
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
body = response.json()
assert body["status"] == "fail"
for key in (
"postgres:responseTime",
"valkey:responseTime",
"neo4j:responseTime",
):
entry = body["checks"][key][0]
assert entry["status"] == "fail"
# No dependency-specific error string leaks into the payload.
assert "output" not in entry
def test_does_not_leak_exception_detail_on_failure(self, api_client):
# Sanity check: an exception message resembling infra detail
# (host, port, credentials) must not surface in the response under
# any field.
sensitive = (
"connection to server at "
'"postgres-rw.prod.svc.cluster.local" (10.0.0.5), port 5432 '
'failed: FATAL: password authentication failed for user "prowler_user"'
)
with (
patch("api.health._probe_postgres", side_effect=RuntimeError(sensitive)),
patch("api.health._probe_valkey"),
patch("api.health._probe_neo4j"),
):
response = api_client.get(reverse("health-ready"))
body = response.json()
assert "output" not in body["checks"]["postgres:responseTime"][0]
payload_text = response.content.decode()
for token in (
"postgres-rw",
"10.0.0.5",
"5432",
"prowler_user",
"password authentication failed",
):
assert token not in payload_text
def test_does_not_require_authentication(self, api_client):
with (
patch("api.health._probe_postgres"),
patch("api.health._probe_valkey"),
patch("api.health._probe_neo4j"),
):
api_client.credentials()
response = api_client.get(reverse("health-ready"))
assert response.status_code == status.HTTP_200_OK
class TestReadinessCache:
"""In-process cache caps the rate at which real probes hit the deps."""
def test_result_is_cached_for_ttl_seconds(self, api_client):
with (
patch("api.health._probe_postgres") as pg,
patch("api.health._probe_valkey") as vk,
patch("api.health._probe_neo4j") as neo,
):
r1 = api_client.get(reverse("health-ready"))
r2 = api_client.get(reverse("health-ready"))
assert r1.status_code == status.HTTP_200_OK
assert r2.status_code == status.HTTP_200_OK
# Second request must not trigger fresh dep checks within the TTL.
assert pg.call_count == 1
assert vk.call_count == 1
assert neo.call_count == 1
# The cached payload is returned verbatim (same timestamps too).
assert r1.json() == r2.json()
def test_re_probes_after_cache_ttl_expires(self, api_client):
with (
patch("api.health._probe_postgres") as pg,
patch("api.health._probe_valkey"),
patch("api.health._probe_neo4j"),
):
api_client.get(reverse("health-ready"))
assert pg.call_count == 1
# Rewind the cached timestamp past the TTL so the next request
# is forced to recompute.
cached_ts, payload, http_status_code = health._readiness_cache
health._readiness_cache = (
cached_ts - health.READINESS_CACHE_TTL_SECONDS - 0.1,
payload,
http_status_code,
)
api_client.get(reverse("health-ready"))
assert pg.call_count == 2
def test_cache_persists_a_failing_result(self, api_client):
# A failing readiness result is cached too; this is intentional so
# an attacker spamming the endpoint during an outage cannot amplify
# the dependency load.
with (
patch("api.health._probe_postgres", side_effect=RuntimeError("down")) as pg,
patch("api.health._probe_valkey"),
patch("api.health._probe_neo4j"),
):
r1 = api_client.get(reverse("health-ready"))
r2 = api_client.get(reverse("health-ready"))
assert r1.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
assert r2.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
assert pg.call_count == 1
class TestRateLimiting:
"""The endpoints are unauthenticated and exposed; per-IP throttle caps
naive single-source floods."""
def test_live_blocks_after_budget_exhausted(self, api_client):
# Shrink the budget to 3 req per window so the test stays fast and
# deterministic. parse_rate runs once per throttle instance and
# each request gets a fresh instance, so this patch propagates.
from rest_framework.throttling import ScopedRateThrottle
with patch.object(ScopedRateThrottle, "parse_rate", return_value=(3, 60)):
statuses = [
api_client.get(reverse("health-live")).status_code for _ in range(4)
]
assert statuses[:3] == [status.HTTP_200_OK] * 3
assert statuses[3] == status.HTTP_429_TOO_MANY_REQUESTS
def test_ready_blocks_after_budget_exhausted(self, api_client):
from rest_framework.throttling import ScopedRateThrottle
with (
patch("api.health._probe_postgres"),
patch("api.health._probe_valkey"),
patch("api.health._probe_neo4j"),
patch.object(ScopedRateThrottle, "parse_rate", return_value=(2, 60)),
):
statuses = [
api_client.get(reverse("health-ready")).status_code for _ in range(3)
]
assert statuses[:2] == [status.HTTP_200_OK] * 2
assert statuses[2] == status.HTTP_429_TOO_MANY_REQUESTS
class TestProbeImplementations:
"""Smoke tests for each probe primitive."""
@pytest.mark.django_db
def test_postgres_probe_succeeds_against_real_db(self):
assert health._probe_postgres() is None
def test_postgres_probe_propagates_db_errors(self):
class _BoomCursor:
def __enter__(self):
return self
def __exit__(self, *_):
return False
def execute(self, *_args, **_kwargs):
raise RuntimeError("boom")
def fetchone(self): # pragma: no cover - never reached
return None
with patch("api.health.connections") as mock_connections:
mock_connections.__getitem__.return_value.cursor.return_value = (
_BoomCursor()
)
with pytest.raises(RuntimeError, match="boom"):
health._probe_postgres()
def test_valkey_probe_succeeds_when_ping_returns_true(self):
with patch("api.health.redis.Redis.from_url") as mock_from_url:
mock_from_url.return_value.ping.return_value = True
assert health._probe_valkey() is None
def test_valkey_probe_raises_when_ping_returns_false(self):
with patch("api.health.redis.Redis.from_url") as mock_from_url:
mock_from_url.return_value.ping.return_value = False
with pytest.raises(RuntimeError, match="PING"):
health._probe_valkey()
def test_valkey_probe_propagates_connection_errors(self):
with patch("api.health.redis.Redis.from_url") as mock_from_url:
mock_from_url.return_value.ping.side_effect = ConnectionError("nope")
with pytest.raises(ConnectionError, match="nope"):
health._probe_valkey()
def test_valkey_probe_suppresses_redis_error_on_close(self):
# A redis-py-level failure releasing the socket must not mask a
# successful PING (best-effort cleanup contract).
import redis as redis_pkg
with patch("api.health.redis.Redis.from_url") as mock_from_url:
client = mock_from_url.return_value
client.ping.return_value = True
client.close.side_effect = redis_pkg.RedisError("connection reset")
assert health._probe_valkey() is None
client.close.assert_called_once_with()
def test_valkey_probe_suppresses_oserror_on_close(self):
# Socket-layer failures (OSError family) on close are also part of
# the swallowed scope.
with patch("api.health.redis.Redis.from_url") as mock_from_url:
client = mock_from_url.return_value
client.ping.return_value = True
client.close.side_effect = OSError("EBADF")
assert health._probe_valkey() is None
client.close.assert_called_once_with()
def test_valkey_probe_lets_unexpected_close_errors_propagate(self):
# The suppress() is deliberately narrow: anything outside
# (redis.RedisError, OSError) must surface so it is not silently
# hidden.
with patch("api.health.redis.Redis.from_url") as mock_from_url:
client = mock_from_url.return_value
client.ping.return_value = True
client.close.side_effect = RuntimeError("bug")
with pytest.raises(RuntimeError, match="bug"):
health._probe_valkey()
def test_neo4j_probe_calls_verify_connectivity(self):
with patch("api.attack_paths.database.get_driver") as mock_get_driver:
mock_get_driver.return_value.verify_connectivity.return_value = None
assert health._probe_neo4j() is None
mock_get_driver.return_value.verify_connectivity.assert_called_once_with()
def test_neo4j_probe_propagates_driver_errors(self):
with patch("api.attack_paths.database.get_driver") as mock_get_driver:
mock_get_driver.return_value.verify_connectivity.side_effect = RuntimeError(
"unreachable"
)
with pytest.raises(RuntimeError, match="unreachable"):
health._probe_neo4j()
class TestStatusAggregation:
def test_pass_when_all_checks_pass(self):
entries = [{"status": "pass"}, {"status": "pass"}]
assert health._aggregate_status(entries) == "pass"
def test_warn_when_any_check_warns_and_none_fail(self):
entries = [{"status": "pass"}, {"status": "warn"}]
assert health._aggregate_status(entries) == "warn"
def test_fail_when_any_check_fails(self):
entries = [{"status": "pass"}, {"status": "warn"}, {"status": "fail"}]
assert health._aggregate_status(entries) == "fail"
+40
View File
@@ -0,0 +1,40 @@
"""Drift checks for the API version constants.
Guarantee that ``config.version`` always reflects the canonical
``[project].version`` declared in ``api/pyproject.toml``.
"""
import tomllib
from pathlib import Path
import pytest
from config import version as config_version
@pytest.fixture(scope="module")
def pyproject_data():
here = Path(__file__).resolve()
for directory in here.parents:
candidate = directory / "pyproject.toml"
if not candidate.is_file():
continue
with candidate.open("rb") as f:
data = tomllib.load(f)
if data.get("project", {}).get("name") == "prowler-api":
return data
raise AssertionError("api/pyproject.toml not reachable from the test runner")
def test_release_id_matches_pyproject(pyproject_data):
assert config_version.RELEASE_ID == pyproject_data["project"]["version"]
def test_api_version_is_major_of_release_id():
assert config_version.API_VERSION == config_version.RELEASE_ID.split(".", 1)[0]
assert config_version.API_VERSION.isdigit()
def test_api_version_matches_v1_url_prefix():
# The public contract version surfaced in the health payload must match
# the URL namespace the API is published under.
assert config_version.API_VERSION == "1"
+2 -1
View File
@@ -21,6 +21,7 @@ from celery import chain, states
from celery.result import AsyncResult
from config.custom_logging import BackendLogger
from config.env import env
from config.version import RELEASE_ID
from config.settings.social_login import (
GITHUB_OAUTH_CALLBACK_URL,
GOOGLE_OAUTH_CALLBACK_URL,
@@ -424,7 +425,7 @@ class SchemaView(SpectacularAPIView):
def get(self, request, *args, **kwargs):
spectacular_settings.TITLE = "Prowler API"
spectacular_settings.VERSION = "1.28.0"
spectacular_settings.VERSION = RELEASE_ID
spectacular_settings.DESCRIPTION = (
"Prowler API specification.\n\nThis file is auto-generated."
)
+2
View File
@@ -118,6 +118,8 @@ REST_FRAMEWORK = {
"attack-paths-custom-query": env(
"DJANGO_THROTTLE_ATTACK_PATHS_CUSTOM_QUERY", default="10/min"
),
"health-live": env("DJANGO_THROTTLE_HEALTH_LIVE", default="120/min"),
"health-ready": env("DJANGO_THROTTLE_HEALTH_READY", default="60/min"),
},
}
+4
View File
@@ -1,5 +1,9 @@
from django.urls import include, path
from api.health import LivenessView, ReadinessView
urlpatterns = [
path("api/v1/", include("api.v1.urls")),
path("health/live", LivenessView.as_view(), name="health-live"),
path("health/ready", ReadinessView.as_view(), name="health-ready"),
]
+41
View File
@@ -0,0 +1,41 @@
"""Single source of truth for the API version.
The semantic version is read once from ``api/pyproject.toml`` at module
import; consumers (health payload, OpenAPI schema) read the resulting
constants. Fails fast at boot if the file cannot be located, so a
packaging mistake surfaces immediately rather than serving stale data.
"""
from __future__ import annotations
import tomllib
from pathlib import Path
_PROJECT_NAME = "prowler-api"
def _discover_release_id() -> str:
here = Path(__file__).resolve()
for directory in here.parents:
candidate = directory / "pyproject.toml"
if not candidate.is_file():
continue
with candidate.open("rb") as f:
data = tomllib.load(f)
project = data.get("project") or {}
if project.get("name") != _PROJECT_NAME:
continue
version = project.get("version")
if not isinstance(version, str) or not version:
raise RuntimeError(
f"{candidate} declares an empty or invalid [project].version"
)
return version
raise RuntimeError(
f"Could not locate the {_PROJECT_NAME} pyproject.toml from {here}"
)
RELEASE_ID: str = _discover_release_id()
# Public contract major (e.g. "1"); matches the /api/v1/ namespace.
API_VERSION: str = RELEASE_ID.split(".", 1)[0]
+5 -2
View File
@@ -590,13 +590,16 @@ resources: {}
# memory: 128Mi
# This is to setup the liveness and readiness probes more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
# /health/live succeeds while the process answers; /health/ready also
# checks PostgreSQL, Valkey and Neo4j connectivity and returns 503 when
# any of them is unreachable.
livenessProbe:
httpGet:
path: /
path: /health/live
port: http
readinessProbe:
httpGet:
path: /
path: /health/ready
port: http
#This section is for setting up autoscaling more information can be found here: https://kubernetes.io/docs/concepts/workloads/autoscaling/
+6 -3
View File
@@ -270,20 +270,23 @@ api:
# 3m30s to setup DB
# startupProbe:
# httpGet:
# path: /api/v1/docs
# path: /health/live
# port: http
# This is to setup the liveness and readiness probes more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
# /health/live succeeds while the process answers; /health/ready also
# checks PostgreSQL, Valkey and Neo4j connectivity and returns 503 when
# any of them is unreachable.
livenessProbe:
failureThreshold: 10
httpGet:
path: /api/v1/docs
path: /health/live
port: http
periodSeconds: 20
readinessProbe:
failureThreshold: 10
httpGet:
path: /api/v1/docs
path: /health/ready
port: http
periodSeconds: 20
+1 -1
View File
@@ -37,7 +37,7 @@ services:
neo4j:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "wget -q -O /dev/null http://127.0.0.1:${DJANGO_PORT:-8080}/api/v1/ || exit 1"]
test: ["CMD-SHELL", "wget -q -O /dev/null http://127.0.0.1:${DJANGO_PORT:-8080}/health/live || exit 1"]
interval: 10s
timeout: 5s
retries: 12
+1 -1
View File
@@ -33,7 +33,7 @@ services:
neo4j:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "wget -q -O /dev/null http://127.0.0.1:${DJANGO_PORT:-8080}/api/v1/ || exit 1"]
test: ["CMD-SHELL", "wget -q -O /dev/null http://127.0.0.1:${DJANGO_PORT:-8080}/health/live || exit 1"]
interval: 10s
timeout: 5s
retries: 12