From 28c064a9b7369b4f5c69bda0dec4ead3707c194f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20De=20la=20Torre=20Vico?= Date: Tue, 16 Jun 2026 10:26:20 +0200 Subject: [PATCH] feat(api): add Server-Sent Events (SSE) infrastructure (#11556) --- api/CHANGELOG.md | 4 + api/docker-entrypoint.sh | 12 +- api/pyproject.toml | 6 +- api/src/backend/api/authentication.py | 27 ++ api/src/backend/api/sse/__init__.py | 13 + api/src/backend/api/sse/base_views.py | 46 ++++ api/src/backend/api/sse/channelmanager.py | 75 ++++++ api/src/backend/api/sse/utils.py | 51 ++++ .../backend/api/tests/test_authentication.py | 63 ++++- api/src/backend/api/tests/test_sse.py | 191 ++++++++++++++ api/src/backend/config/django/base.py | 3 + api/src/backend/config/guniconf.py | 9 + .../backend/config/settings/eventstream.py | 41 +++ api/uv.lock | 60 ++++- docs/developer-guide/server-sent-events.mdx | 241 ++++++++++++++++++ docs/docs.json | 3 +- 16 files changed, 832 insertions(+), 13 deletions(-) create mode 100644 api/src/backend/api/sse/__init__.py create mode 100644 api/src/backend/api/sse/base_views.py create mode 100644 api/src/backend/api/sse/channelmanager.py create mode 100644 api/src/backend/api/sse/utils.py create mode 100644 api/src/backend/api/tests/test_sse.py create mode 100644 api/src/backend/config/settings/eventstream.py create mode 100644 docs/developer-guide/server-sent-events.mdx diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index 6fa66193e3..afd8b4b071 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to the **Prowler API** are documented in this file. ## [1.32.0] (Prowler UNRELEASED) +### 🚀 Added + +- Server-Sent Events (SSE) infrastructure for the API: a base viewset, a tenant-aware channel manager, and channel-name helpers backed by `django-eventstream` over Valkey Pub/Sub and served through the Gunicorn ASGI worker, so feature endpoints can stream events to clients over a single long-lived connection [(#11556)](https://github.com/prowler-cloud/prowler/pull/11556) + ### 🔐 Security - `aiohttp` to 3.14.0 and `idna` to 3.15, patching known CVEs [(#11596)](https://github.com/prowler-cloud/prowler/pull/11596) diff --git a/api/docker-entrypoint.sh b/api/docker-entrypoint.sh index 9b2964b479..e077a3bfd6 100755 --- a/api/docker-entrypoint.sh +++ b/api/docker-entrypoint.sh @@ -21,13 +21,19 @@ apply_fixtures() { } start_dev_server() { - echo "Starting the development server..." - exec uv run python manage.py runserver 0.0.0.0:"${DJANGO_PORT:-8080}" + echo "Starting the development server (Gunicorn ASGI, debug + reload)..." + # Same server/worker as prod (config.asgi via the native `asgi` worker), so + # SSE streams run on the event loop exactly as they do in production. DEBUG is + # on so guniconf's `reload = DEBUG` hot-reloads edited code (and flips + # `preload_app` off so reload actually takes). + export DJANGO_DEBUG="${DJANGO_DEBUG:-True}" + export DJANGO_BIND_ADDRESS="${DJANGO_BIND_ADDRESS:-0.0.0.0}" + exec uv run gunicorn -c config/guniconf.py config.asgi:application } start_prod_server() { echo "Starting the Gunicorn server..." - exec uv run gunicorn -c config/guniconf.py config.wsgi:application + exec uv run gunicorn -c config/guniconf.py config.asgi:application } resolve_worker_hostname() { diff --git a/api/pyproject.toml b/api/pyproject.toml index 521d496e5b..baeeb6ea0a 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -41,7 +41,8 @@ dependencies = [ "drf-spectacular==0.27.2", "drf-spectacular-jsonapi==0.5.1", "defusedxml==0.7.1", - "gunicorn==23.0.0", + "django-eventstream==5.3.3", + "gunicorn==26.0.0", "lxml==6.1.0", "prowler @ git+https://github.com/prowler-cloud/prowler.git@master", "psycopg2-binary==2.9.9", @@ -209,6 +210,7 @@ constraint-dependencies = [ "django-celery-results==2.6.0", "django-cors-headers==4.4.0", "django-environ==0.11.2", + "django-eventstream==5.3.3", "django-filter==24.3", "django-guid==3.5.0", "django-postgres-extra==2.0.9", @@ -253,7 +255,7 @@ constraint-dependencies = [ "grpc-google-iam-v1==0.14.3", "grpcio==1.76.0", "grpcio-status==1.76.0", - "gunicorn==23.0.0", + "gunicorn==26.0.0", "h11==0.16.0", "h2==4.3.0", "hpack==4.1.0", diff --git a/api/src/backend/api/authentication.py b/api/src/backend/api/authentication.py index 499e290bb7..04740ac219 100644 --- a/api/src/backend/api/authentication.py +++ b/api/src/backend/api/authentication.py @@ -93,3 +93,30 @@ class CombinedJWTOrAPIKeyAuthentication(BaseAuthentication): # Default fallback return self.jwt_auth.authenticate(request) + + +class SSEAuthentication(CombinedJWTOrAPIKeyAuthentication): + """JWT/API-Key auth that also accepts `?access_token=`. + + Browser `EventSource` is the only widely available SSE client API + and it cannot set the `Authorization` header (its constructor takes + only a URL and `withCredentials`). To keep browser SSE clients on + the same auth stack as the rest of the API, SSE endpoints additionally + accept a JWT via the `?access_token=` query parameter — the + standard parameter name defined in RFC 6750 Section 2.3 for bearer tokens. + """ + + def authenticate(self, request: Request): + auth_header = request.headers.get("Authorization", "") + if auth_header: + return super().authenticate(request) + + raw_token = request.query_params.get("access_token") + if not raw_token: + # No header and no query token — let the default path raise + # the canonical AuthenticationFailed via the parent class. + return super().authenticate(request) + + validated_token = self.jwt_auth.get_validated_token(raw_token) + user = self.jwt_auth.get_user(validated_token) + return user, validated_token diff --git a/api/src/backend/api/sse/__init__.py b/api/src/backend/api/sse/__init__.py new file mode 100644 index 0000000000..244bd3c9ad --- /dev/null +++ b/api/src/backend/api/sse/__init__.py @@ -0,0 +1,13 @@ +"""Platform Server-Sent Events (SSE) infrastructure. + +Wires `django-eventstream` into the API: a base viewset features +subclass to expose an SSE endpoint +(:class:`api.sse.base_views.BaseSSEViewSet`), the channel manager that +enforces the tenant gate (:class:`api.sse.channelmanager.SSEChannelManager`), +and the channel-name helpers (:func:`api.sse.utils.make_channel_name`). +""" + +from api.sse.utils import make_channel_name +from api.sse.base_views import BaseSSEViewSet + +__all__ = ["BaseSSEViewSet", "make_channel_name"] diff --git a/api/src/backend/api/sse/base_views.py b/api/src/backend/api/sse/base_views.py new file mode 100644 index 0000000000..c4a24540ee --- /dev/null +++ b/api/src/backend/api/sse/base_views.py @@ -0,0 +1,46 @@ +"""Base view class for SSE endpoints.""" + +from api.authentication import SSEAuthentication +from api.base_views import BaseRLSViewSet +from django_eventstream.renderers import SSEEventRenderer +from django_eventstream.views import events + + +class BaseSSEViewSet(BaseRLSViewSet): + """Base class for platform SSE endpoints. + + Subclasses override method `get_channels` to declare the channel + names the connection should subscribe to — the same way a regular + DRF viewset overrides method `get_queryset`. The channel manager + reads the result from `request.sse_channels`; there is no other + coupling between platform and feature. + """ + + authentication_classes = [SSEAuthentication] + # Pin the SSE renderer so content negotiation accepts the browser's + # `Accept: text/event-stream`. + renderer_classes = [SSEEventRenderer] + + def get_channels(self) -> set[str]: + """Return the channels this connection subscribes to. + + Implementations MUST raise the relevant DRF exceptions + (`NotAuthenticated`, `PermissionDenied`, `NotFound`) when + authorization fails. Returning an empty set would surface as + django-eventstream's "No channels specified" which masks the + real cause. + """ + raise NotImplementedError + + def get_queryset(self): + # Most SSE viewsets only need `get_channels` and never call + # `get_queryset` (the SSE list path bypasses serialization + # entirely). Subclasses that perform their own queryset lookup + # inside `get_channels` should override; the default raises + # the same error a missing override on a ModelViewSet would. + raise NotImplementedError + + def list(self, request, *_args, **kwargs): + """Resolve channels under the regular DRF stack and stream.""" + request.sse_channels = self.get_channels() + return events(request, **kwargs) diff --git a/api/src/backend/api/sse/channelmanager.py b/api/src/backend/api/sse/channelmanager.py new file mode 100644 index 0000000000..72b2c18367 --- /dev/null +++ b/api/src/backend/api/sse/channelmanager.py @@ -0,0 +1,75 @@ +"""Channel manager that wires `django-eventstream` to platform SSE views.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from uuid import UUID + +from django_eventstream.channelmanager import DefaultChannelManager +from rest_framework.request import Request + +from api.sse.utils import tenant_id_from_channel + +if TYPE_CHECKING: + from api.models import User + + +class SSEChannelManager(DefaultChannelManager): + """Connect `django-eventstream` to the platform's SSE viewsets.""" + + def get_channels_for_request(self, request: Request, view_kwargs: dict) -> set[str]: # noqa: vulture + """Return the request's channels scoped to the active JWT tenant. + + Args: + request: The authenticated DRF request, carrying `tenant_id` + (set by `BaseRLSViewSet`) and `sse_channels` (set by + `BaseSSEViewSet.list`). + view_kwargs: URL keyword arguments from django-eventstream; + unused because channels are resolved on the request. + + Returns: + The subset of `request.sse_channels` whose embedded tenant + matches the active request tenant. + """ + try: + request_tenant_id = UUID(str(getattr(request, "tenant_id", None))) + except (TypeError, ValueError): + return set() + return { + channel + for channel in getattr(request, "sse_channels", set()) + if tenant_id_from_channel(channel) == request_tenant_id + } + + def can_read_channel(self, user: "User | None", channel: str) -> bool: + """Re-verify tenant membership once the stream is established. + + Args: + user: The connection's authenticated `User`, or `None` for an + anonymous connection — django-eventstream passes `None` + rather than an `AnonymousUser`. + channel: The channel name being read, in the canonical + `::` format. + + Returns: + `True` only when `user` is authenticated and a member of the + tenant embedded in `channel`; `False` otherwise, including for + anonymous connections and malformed channel names. + """ + if user is None or not user.is_authenticated: + return False + tenant_id = tenant_id_from_channel(channel) + if tenant_id is None: + return False + return user.is_member_of_tenant(tenant_id) + + def is_channel_reliable(self, channel: str) -> bool: + """Report whether the channel keeps a server-side replay buffer. + + Args: + channel: The channel name being queried. + + Returns: + `False`, unconditionally. Replay storage is not configured + """ + return False diff --git a/api/src/backend/api/sse/utils.py b/api/src/backend/api/sse/utils.py new file mode 100644 index 0000000000..a30ed26311 --- /dev/null +++ b/api/src/backend/api/sse/utils.py @@ -0,0 +1,51 @@ +"""Channel-name convention shared by SSE publishers, consumers, and the +channel manager. The format is `::`. +""" + +from __future__ import annotations + +import uuid + +CHANNEL_SEPARATOR = ":" + + +def make_channel_name( + prefix: str, + tenant_id: str | uuid.UUID, + resource_id: str | uuid.UUID, +) -> str: + """Build the canonical channel name for a resource. + + Args: + prefix: Feature-owned prefix (e.g. `"lighthouse-session"`). + tenant_id: Tenant the resource belongs to. + resource_id: Resource identifier within the tenant. + + Raises: + ValueError: If any segment contains `CHANNEL_SEPARATOR`, which + would break the `::` contract + and let a crafted name smuggle extra segments past the parser. + """ + segments = (str(prefix), str(tenant_id), str(resource_id)) + if any(CHANNEL_SEPARATOR in segment for segment in segments): + raise ValueError( + f"Channel segments must not contain '{CHANNEL_SEPARATOR}': {segments!r}" + ) + return CHANNEL_SEPARATOR.join(segments) + + +def tenant_id_from_channel(channel: str) -> uuid.UUID | None: + """Return the tenant UUID embedded in *channel*, or `None` if + *channel* does not follow the platform convention. + + A `None` result MUST be treated by callers as "not authorized" or + a malformed channel cannot be safely read. + """ + segments = channel.split(CHANNEL_SEPARATOR) + if len(segments) != 3: + # Reject non-canonical names + return None + try: + return uuid.UUID(segments[1]) + except ValueError: + return None diff --git a/api/src/backend/api/tests/test_authentication.py b/api/src/backend/api/tests/test_authentication.py index 6745c36e91..505d7c9320 100644 --- a/api/src/backend/api/tests/test_authentication.py +++ b/api/src/backend/api/tests/test_authentication.py @@ -1,13 +1,13 @@ import time from datetime import datetime, timedelta, timezone -from unittest.mock import patch +from unittest.mock import MagicMock, patch from uuid import uuid4 import pytest from django.test import RequestFactory from rest_framework.exceptions import AuthenticationFailed -from api.authentication import TenantAPIKeyAuthentication +from api.authentication import SSEAuthentication, TenantAPIKeyAuthentication from api.db_router import MainRouter from api.models import TenantAPIKey @@ -382,3 +382,62 @@ class TestTenantAPIKeyAuthentication: auth_backend.authenticate(request) assert str(exc_info.value.detail) == "API Key has already expired." + + +class TestSSEAuthentication: + """`SSEAuthentication` adds an `?access_token=` fallback for + browser `EventSource` clients while keeping the standard + `Authorization` header as the authoritative source.""" + + def test_header_present_delegates_to_super(self): + request = MagicMock() + request.headers = {"Authorization": "Bearer header-token"} + with patch.object( + SSEAuthentication.__bases__[0], "authenticate", return_value=("user", "tok") + ) as super_auth: + result = SSEAuthentication().authenticate(request) + super_auth.assert_called_once_with(request) + assert result == ("user", "tok") + + def test_no_header_no_query_token_delegates_to_super(self): + request = MagicMock() + request.headers = {} + request.query_params = {} + with patch.object( + SSEAuthentication.__bases__[0], "authenticate", return_value=None + ) as super_auth: + result = SSEAuthentication().authenticate(request) + super_auth.assert_called_once_with(request) + assert result is None + + def test_query_token_used_only_as_fallback(self): + request = MagicMock() + request.headers = {} + request.query_params = {"access_token": "query-jwt"} + + jwt_instance = MagicMock() + jwt_instance.get_validated_token.return_value = "validated" + jwt_instance.get_user.return_value = "query-user" + + with patch.object(SSEAuthentication, "jwt_auth", jwt_instance): + user, token = SSEAuthentication().authenticate(request) + + jwt_instance.get_validated_token.assert_called_once_with("query-jwt") + assert user == "query-user" + assert token == "validated" + + def test_query_token_invalid_raises_authentication_failed(self): + request = MagicMock() + request.headers = {} + request.query_params = {"access_token": "bad-token"} + + jwt_instance = MagicMock() + jwt_instance.get_validated_token.side_effect = AuthenticationFailed( + "Invalid token" + ) + + with patch.object(SSEAuthentication, "jwt_auth", jwt_instance): + with pytest.raises(AuthenticationFailed): + SSEAuthentication().authenticate(request) + + jwt_instance.get_validated_token.assert_called_once_with("bad-token") diff --git a/api/src/backend/api/tests/test_sse.py b/api/src/backend/api/tests/test_sse.py new file mode 100644 index 0000000000..beba821e64 --- /dev/null +++ b/api/src/backend/api/tests/test_sse.py @@ -0,0 +1,191 @@ +"""Tests for the platform SSE infrastructure (``api.sse``). + +Cover the two security-critical platform pieces — the channel-name +convention (:mod:`api.sse.utils`) and the tenant gate enforced by +:class:`api.sse.channelmanager.SSEChannelManager`. The SSE authentication +class lives in :mod:`api.authentication` with the rest of the auth stack, +so its tests live in ``test_authentication.py``. Per-feature SSE endpoints +add their own tests on top of these. +""" + +import uuid +from unittest.mock import MagicMock + +import pytest +from django.http import StreamingHttpResponse +from rest_framework.test import APIRequestFactory, force_authenticate + +from api.sse.base_views import BaseSSEViewSet +from api.sse.channelmanager import SSEChannelManager +from api.sse.utils import make_channel_name, tenant_id_from_channel + + +class TestMakeChannel: + def test_round_trips_tenant_id(self): + tenant_id = uuid.uuid4() + channel = make_channel_name("lighthouse-session", tenant_id, uuid.uuid4()) + assert tenant_id_from_channel(channel) == tenant_id + + def test_accepts_str_arguments(self): + tenant_id = uuid.uuid4() + channel = make_channel_name("lighthouse-session", str(tenant_id), "resource-1") + assert channel == f"lighthouse-session:{tenant_id}:resource-1" + + def test_prefix_with_hyphen_is_not_split(self): + # Prefixes contain hyphens but never colons, so the tenant id is + # always the second colon-separated segment. + tenant_id = uuid.uuid4() + channel = make_channel_name("a-long-hyphenated-prefix", tenant_id, "res") + assert tenant_id_from_channel(channel) == tenant_id + + @pytest.mark.parametrize( + "prefix, tenant_id, resource_id", + [ + ("evil:prefix", uuid.uuid4(), "res"), + ("prefix", uuid.uuid4(), "res:extra"), + ("prefix", "tenant:smuggled", "res"), + ], + ) + def test_rejects_separator_injection(self, prefix, tenant_id, resource_id): + # A colon in any segment would let a crafted name smuggle extra + # segments past the parser, so construction must fail loudly. + with pytest.raises(ValueError): + make_channel_name(prefix, tenant_id, resource_id) + + +class TestTenantIdFromChannel: + def test_returns_none_for_too_few_segments(self): + assert tenant_id_from_channel("prefix:only") is None + assert tenant_id_from_channel("garbage") is None + + def test_returns_none_for_too_many_segments(self): + # A valid tenant UUID in position 1 must not authorize a + # non-canonical name that carries extra segments. + tenant_id = uuid.uuid4() + assert tenant_id_from_channel(f"prefix:{tenant_id}:resource:extra") is None + + def test_returns_none_for_non_uuid_tenant_segment(self): + assert tenant_id_from_channel("prefix:not-a-uuid:resource") is None + + def test_parses_valid_channel(self): + tenant_id = uuid.uuid4() + assert tenant_id_from_channel(f"prefix:{tenant_id}:resource") == tenant_id + + +@pytest.mark.django_db +class TestSSEChannelManager: + def test_member_can_read_own_tenant_channel( + self, create_test_user, tenants_fixture + ): + tenant = tenants_fixture[0] + channel = make_channel_name("lighthouse-session", tenant.id, uuid.uuid4()) + assert SSEChannelManager().can_read_channel(create_test_user, channel) + + def test_non_member_cannot_read_other_tenant_channel( + self, create_test_user, tenants_fixture + ): + # create_test_user is a member of tenant1 and tenant2 but not tenant3. + foreign_tenant = tenants_fixture[2] + channel = make_channel_name( + "lighthouse-session", foreign_tenant.id, uuid.uuid4() + ) + assert not SSEChannelManager().can_read_channel(create_test_user, channel) + + def test_anonymous_user_is_rejected(self, tenants_fixture): + channel = make_channel_name( + "lighthouse-session", tenants_fixture[0].id, uuid.uuid4() + ) + assert not SSEChannelManager().can_read_channel(None, channel) + + anon = MagicMock(is_authenticated=False) + assert not SSEChannelManager().can_read_channel(anon, channel) + + def test_malformed_channel_is_rejected(self, create_test_user, tenants_fixture): + assert not SSEChannelManager().can_read_channel(create_test_user, "garbage") + + def test_get_channels_for_request_returns_active_tenant_channels(self): + tenant_id = uuid.uuid4() + own = make_channel_name("prefix", tenant_id, "resource") + request = MagicMock() + request.tenant_id = str(tenant_id) + request.sse_channels = {own} + assert SSEChannelManager().get_channels_for_request(request, {}) == {own} + + def test_get_channels_for_request_drops_other_tenant_channels(self): + # Fail-closed: a channel for a tenant other than the active JWT + # tenant is dropped before reaching django-eventstream, even if the + # viewset mistakenly stashed it. This is the primary tenant gate that + # binds authorization to request.tenant_id, not just membership. + active_tenant = uuid.uuid4() + own = make_channel_name("prefix", active_tenant, "resource") + foreign = make_channel_name("prefix", uuid.uuid4(), "resource") + request = MagicMock() + request.tenant_id = str(active_tenant) + request.sse_channels = {own, foreign} + assert SSEChannelManager().get_channels_for_request(request, {}) == {own} + + def test_get_channels_for_request_drops_malformed_channels(self): + request = MagicMock() + request.tenant_id = str(uuid.uuid4()) + request.sse_channels = {"garbage", "prefix:not-a-uuid:resource"} + assert SSEChannelManager().get_channels_for_request(request, {}) == set() + + def test_get_channels_for_request_without_tenant_returns_empty(self): + # No active tenant on the request (auth/RLS never ran) → fail closed, + # regardless of any channels stashed on it. + request = MagicMock(spec=[]) + assert SSEChannelManager().get_channels_for_request(request, {}) == set() + + def test_get_channels_for_request_defaults_to_empty(self): + # A request that never went through BaseSSEViewSet.list has no + # sse_channels attribute; the manager must not raise. + request = object() + assert SSEChannelManager().get_channels_for_request(request, {}) == set() + + def test_channel_is_not_reliable(self): + # v1 ships without server-side replay storage. + assert ( + SSEChannelManager().is_channel_reliable("prefix:tenant:resource") is False + ) + + +@pytest.mark.django_db +class TestBaseSSEViewSet: + """End-to-end check that the base viewset opens a stream. + + ``BaseSSEViewSet.list`` hands the DRF ``Request`` straight to + django-eventstream's ``events()``, which is written for a plain + Django request. This drives a real request through the full DRF + stack (authentication, RLS, content negotiation, channel manager) + and asserts the result is an SSE stream, so the DRF/Django request + mismatch cannot regress silently. + """ + + def test_list_opens_event_stream(self, create_test_user, tenants_fixture): + tenant = tenants_fixture[0] + channel = make_channel_name("test-sse", tenant.id, uuid.uuid4()) + seen_tenant_ids = [] + + class _StreamingSSEViewSet(BaseSSEViewSet): + def get_channels(self): + # Reached only after dispatch/initial ran, so the RLS + # tenant context is already on the request. + seen_tenant_ids.append(self.request.tenant_id) + return {channel} + + request = APIRequestFactory().get("/api/v1/test-sse/stream") + force_authenticate( + request, user=create_test_user, token={"tenant_id": str(tenant.id)} + ) + + view = _StreamingSSEViewSet.as_view({"get": "list"}) + response = view(request) + + # A StreamingHttpResponse (not the plain HttpResponse used for SSE + # error envelopes) means events() accepted the DRF request, the + # channel manager handed it a non-empty channel set, and the + # stream was opened end to end. + assert isinstance(response, StreamingHttpResponse) + assert response.status_code == 200 + assert response["Content-Type"] == "text/event-stream" + assert seen_tenant_ids == [str(tenant.id)] diff --git a/api/src/backend/config/django/base.py b/api/src/backend/config/django/base.py index 38cf047ac2..3308561b48 100644 --- a/api/src/backend/config/django/base.py +++ b/api/src/backend/config/django/base.py @@ -3,6 +3,7 @@ from datetime import timedelta from config.custom_logging import LOGGING # noqa from config.env import BASE_DIR, env # noqa from config.settings.celery import * # noqa +from config.settings.eventstream import * # noqa from config.settings.partitions import * # noqa from config.settings.sentry import * # noqa from config.settings.social_login import * # noqa @@ -44,6 +45,7 @@ INSTALLED_APPS = [ "dj_rest_auth.registration", "rest_framework.authtoken", "drf_simple_apikey", + "django_eventstream", ] MIDDLEWARE = [ @@ -136,6 +138,7 @@ SPECTACULAR_SETTINGS = { } WSGI_APPLICATION = "config.wsgi.application" +ASGI_APPLICATION = "config.asgi.application" DJANGO_GUID = { "GUID_HEADER_NAME": "Transaction-ID", diff --git a/api/src/backend/config/guniconf.py b/api/src/backend/config/guniconf.py index a16c8de9a0..6f07913e81 100644 --- a/api/src/backend/config/guniconf.py +++ b/api/src/backend/config/guniconf.py @@ -25,6 +25,15 @@ bind = f"{BIND_ADDRESS}:{PORT}" workers = env.int("DJANGO_WORKERS", default=multiprocessing.cpu_count() * 2 + 1) reload = DEBUG +# Native ASGI worker (gunicorn 24+). Required so SSE endpoints can keep the +# event loop alive while waiting for events. +worker_class = env("DJANGO_WORKER_CLASS", default="asgi") + +# Preload the application before forking workers in production: the app is +# imported once in the master and workers fork from it. In development, disable +# preload so the server restarts on code changes. +preload_app = not DEBUG + # Logging logconfig_dict = DJANGO_LOGGERS gunicorn_logger = logging.getLogger(BackendLogger.GUNICORN) diff --git a/api/src/backend/config/settings/eventstream.py b/api/src/backend/config/settings/eventstream.py new file mode 100644 index 0000000000..470062050b --- /dev/null +++ b/api/src/backend/config/settings/eventstream.py @@ -0,0 +1,41 @@ +"""Server-Sent Events (SSE) configuration. + +Wires django-eventstream into the platform: Valkey Pub/Sub backend on a +dedicated DB (separate from the Celery broker), the platform channel +manager, and headers that match the existing CORS allowlist. +""" + +from config.env import env +from config.settings.celery import ( + VALKEY_HOST, + VALKEY_PASSWORD, + VALKEY_PORT, + VALKEY_SCHEME, + VALKEY_USERNAME, +) + +# Dedicated Valkey DB for the SSE Pub/Sub bus. Kept distinct from the +# Celery broker DB so a noisy broker can't shoulder out streaming +# traffic on the same keyspace. +EVENTSTREAM_VALKEY_DB = env.int("EVENTSTREAM_VALKEY_DB", default=2) + +EVENTSTREAM_REDIS: dict = { + "host": VALKEY_HOST, + "port": int(VALKEY_PORT), + "db": EVENTSTREAM_VALKEY_DB, +} +if VALKEY_PASSWORD: + EVENTSTREAM_REDIS["password"] = VALKEY_PASSWORD +if VALKEY_USERNAME: + EVENTSTREAM_REDIS["username"] = VALKEY_USERNAME +if VALKEY_SCHEME == "rediss": + EVENTSTREAM_REDIS["ssl"] = True + +# Platform channel manager — performs the per-feature authorization and +# rewrites the placeholder channel from the URL into the canonical +# tenant-scoped channel name. See ``api.sse.channelmanager``. +EVENTSTREAM_CHANNELMANAGER_CLASS = "api.sse.channelmanager.SSEChannelManager" + +# Headers a browser EventSource may legitimately send. Keep tight; the +# stream itself reads no body, so no permissive defaults. +EVENTSTREAM_ALLOW_HEADERS = "Cache-Control, Last-Event-ID" diff --git a/api/uv.lock b/api/uv.lock index 921cc70390..8673b8873b 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -146,6 +146,7 @@ constraints = [ { name = "django-celery-results", specifier = "==2.6.0" }, { name = "django-cors-headers", specifier = "==4.4.0" }, { name = "django-environ", specifier = "==0.11.2" }, + { name = "django-eventstream", specifier = "==5.3.3" }, { name = "django-filter", specifier = "==24.3" }, { name = "django-guid", specifier = "==3.5.0" }, { name = "django-postgres-extra", specifier = "==2.0.9" }, @@ -190,7 +191,7 @@ constraints = [ { name = "grpc-google-iam-v1", specifier = "==0.14.3" }, { name = "grpcio", specifier = "==1.76.0" }, { name = "grpcio-status", specifier = "==1.76.0" }, - { name = "gunicorn", specifier = "==23.0.0" }, + { name = "gunicorn", specifier = "==26.0.0" }, { name = "h11", specifier = "==0.16.0" }, { name = "h2", specifier = "==4.3.0" }, { name = "hpack", specifier = "==4.1.0" }, @@ -2356,6 +2357,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c4/f1/468b49cccba3b42dda571063a14c668bb0b53a1d5712426d18e36663bd53/django_environ-0.11.2-py2.py3-none-any.whl", hash = "sha256:0ff95ab4344bfeff693836aa978e6840abef2e2f1145adff7735892711590c05", size = 19141, upload-time = "2023-09-01T21:02:59.88Z" }, ] +[[package]] +name = "django-eventstream" +version = "5.3.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "django" }, + { name = "django-grip" }, + { name = "gripcontrol" }, + { name = "pyjwt", extra = ["crypto"] }, + { name = "six" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f2/49/ec6cbc24f3f30465370df7096cfea9722bad2b0c1f35a7ff5d45fb96cff6/django_eventstream-5.3.3.tar.gz", hash = "sha256:6880b03298eebf18c1b736b972fb862eaf631dfbb79f8b27496418a3495d08dc", size = 47622, upload-time = "2025-10-23T00:22:40.291Z" } + [[package]] name = "django-filter" version = "24.3" @@ -2368,6 +2382,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/09/b1/92f1c30b47c1ebf510c35a2ccad9448f73437e5891bbd2b4febe357cc3de/django_filter-24.3-py3-none-any.whl", hash = "sha256:c4852822928ce17fb699bcfccd644b3574f1a2d80aeb2b4ff4f16b02dd49dc64", size = 95011, upload-time = "2024-08-02T13:27:55.616Z" }, ] +[[package]] +name = "django-grip" +version = "3.5.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "django" }, + { name = "gripcontrol" }, + { name = "pubcontrol" }, + { name = "six" }, + { name = "werkzeug" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cb/d0/2c7b04fa864073cd8cb324f8674672162282d97540d56732cbd3a9ae5bca/django-grip-3.5.2.tar.gz", hash = "sha256:1ee1601492cd110256bd03e4a68797a9fbefa27c15f5a838bf245df97db0450c", size = 7626, upload-time = "2025-03-24T18:53:58.677Z" } + [[package]] name = "django-guid" version = "3.5.0" @@ -2979,6 +3006,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c8/ab/717c58343cf02c5265b531384b248787e04d8160b8afe53d9eec053d7b44/greenlet-3.3.1-cp312-cp312-win_arm64.whl", hash = "sha256:bfb2d1763d777de5ee495c85309460f6fd8146e50ec9d0ae0183dbf6f0a829d1", size = 226403, upload-time = "2026-01-23T15:31:39.372Z" }, ] +[[package]] +name = "gripcontrol" +version = "4.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pubcontrol" }, + { name = "pyjwt", extra = ["crypto"] }, + { name = "six" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4f/51/1cbf88384fbe97a1454fb0adddcdca8cb90ceb99c3250274c334db844f4f/gripcontrol-4.4.0.tar.gz", hash = "sha256:44ee6fe244a02870aa4e5bc810138ccaf5070dce5eb149b8ee9e27b960a95c2d", size = 12526, upload-time = "2026-05-14T21:19:28.49Z" } + [[package]] name = "grpc-google-iam-v1" version = "0.14.3" @@ -3040,14 +3078,14 @@ wheels = [ [[package]] name = "gunicorn" -version = "23.0.0" +version = "26.0.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "packaging" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/34/72/9614c465dc206155d93eff0ca20d42e1e35afc533971379482de953521a4/gunicorn-23.0.0.tar.gz", hash = "sha256:f014447a0101dc57e294f6c18ca6b40227a4c90e9bdb586042628030cba004ec", size = 375031, upload-time = "2024-08-10T20:25:27.378Z" } +sdist = { url = "https://files.pythonhosted.org/packages/6d/b7/a4a3f632f823e432ce6bc65f62961b7980c898c77f075a2f7118cb3846fe/gunicorn-26.0.0.tar.gz", hash = "sha256:ca9346f85e3a4aeeb64d491045c16b9a35647abd37ea15efe53080eb8b090baf", size = 727286, upload-time = "2026-05-05T06:38:25.529Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/cb/7d/6dac2a6e1eba33ee43f318edbed4ff29151a49b5d37f080aad1e6469bca4/gunicorn-23.0.0-py3-none-any.whl", hash = "sha256:ec400d38950de4dfd418cff8328b2c8faed0edb0d517d3394e457c317908ca4d", size = 85029, upload-time = "2024-08-10T20:25:24.996Z" }, + { url = "https://files.pythonhosted.org/packages/e6/40/9c2384fc2be4ad25dd4a49decd5ad9ea5a3639814c11bd40ab77cb9f0a14/gunicorn-26.0.0-py3-none-any.whl", hash = "sha256:40233d26a5f0d1872916188c276e21641155111c2853f0c2cd55260aec0d24fc", size = 212009, upload-time = "2026-05-05T06:38:23.007Z" }, ] [[package]] @@ -4510,6 +4548,7 @@ dependencies = [ { name = "django-celery-results" }, { name = "django-cors-headers" }, { name = "django-environ" }, + { name = "django-eventstream" }, { name = "django-filter" }, { name = "django-guid" }, { name = "django-postgres-extra" }, @@ -4574,6 +4613,7 @@ requires-dist = [ { name = "django-celery-results", specifier = "==2.6.0" }, { name = "django-cors-headers", specifier = "==4.4.0" }, { name = "django-environ", specifier = "==0.11.2" }, + { name = "django-eventstream", specifier = "==5.3.3" }, { name = "django-filter", specifier = "==24.3" }, { name = "django-guid", specifier = "==3.5.0" }, { name = "django-postgres-extra", specifier = "==2.0.9" }, @@ -4586,7 +4626,7 @@ requires-dist = [ { name = "drf-spectacular-jsonapi", specifier = "==0.5.1" }, { name = "fonttools", specifier = "==4.62.1" }, { name = "gevent", specifier = "==25.9.1" }, - { name = "gunicorn", specifier = "==23.0.0" }, + { name = "gunicorn", specifier = "==26.0.0" }, { name = "h2", specifier = "==4.3.0" }, { name = "lxml", specifier = "==6.1.0" }, { name = "markdown", specifier = "==3.10.2" }, @@ -4674,6 +4714,16 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7b/08/9c66c269b0d417a0af9fb969535f0371b8c538633535a7a6a5ca3f9231e2/psycopg2_binary-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:81ff62668af011f9a48787564ab7eded4e9fb17a4a6a74af5ffa6a457400d2ab", size = 1163864, upload-time = "2023-10-28T09:37:28.155Z" }, ] +[[package]] +name = "pubcontrol" +version = "3.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pyjwt", extra = ["crypto"] }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/25/6a/02202a247214a6ffd5148ab1b16aca1c334b40dca211bca0442c8b7c7447/pubcontrol-3.5.0.tar.gz", hash = "sha256:a5ec6b3f53edfd005675518e5e4cc23b34122776835ae7c6dbd1db173d1ff0cb", size = 18199, upload-time = "2023-07-05T19:11:40.477Z" } + [[package]] name = "py-deviceid" version = "0.1.1" diff --git a/docs/developer-guide/server-sent-events.mdx b/docs/developer-guide/server-sent-events.mdx new file mode 100644 index 0000000000..c91f27227f --- /dev/null +++ b/docs/developer-guide/server-sent-events.mdx @@ -0,0 +1,241 @@ +--- +title: 'Server-Sent Events (SSE)' +--- + +import { VersionBadge } from "/snippets/version-badge.mdx" + + + +This guide explains how to add a **Server-Sent Events (SSE)** endpoint to the Prowler API. SSE lets the backend push a one-way stream of events to a client over a single long-lived HTTP connection — ideal for live progress, token-by-token LLM output, or any "the server has news for you" use case where the client should not poll. + + +The platform ships the SSE **infrastructure** (`api.sse`) and wiring. No feature endpoint streams over SSE out of the box — this guide shows how to build one on top of the shared base. + + +## When to use SSE + +| Need | Use | +|------|-----| +| Server pushes incremental updates, client only reads | **SSE** | +| Bidirectional, low-latency messaging (chat both ways, games) | WebSocket | +| Client asks, server answers once | Plain REST | + +SSE is the right tool when the **client only consumes**: scan progress, long-running job checkpoints, streamed LLM tokens, cross-client resource-sync notifications. It rides on plain HTTP, reconnects automatically in the browser via the native [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) API, and needs no extra protocol. + +## How it works + +SSE is wired through [`django-eventstream`](https://github.com/fanout/django_eventstream) and a small platform layer in `api/src/backend/api/sse/`: + +| Piece | File | Responsibility | +|-------|------|----------------| +| `BaseSSEViewSet` | `api/sse/base_views.py` | Base DRF viewset a feature subclasses. The feature implements `get_channels`; the base handles auth, the tenant transaction, and delegates streaming to `django-eventstream`. | +| `SSEChannelManager` | `api/sse/channelmanager.py` | Registered in `settings.EVENTSTREAM_CHANNELMANAGER_CLASS`. Reads the channel set off the request and enforces the platform-wide tenant gate. | +| `SSEAuthentication` | `api/authentication.py` | Same JWT/API-key stack as the rest of the API, plus an `?access_token=` fallback for browser `EventSource` clients. Lives with the other authentication classes, not in the `sse` package. | +| `make_channel_name` / `tenant_id_from_channel` | `api/sse/utils.py` | Single source of truth for the channel-name format, so publishers and the channel manager agree byte-for-byte. | +| Settings | `config/settings/eventstream.py` | Valkey Pub/Sub backend (dedicated DB), channel manager, allowed headers. | + +### Transport: the server runs on ASGI + +SSE connections are long-lived. Holding one open per synchronous worker would exhaust the worker pool, so the API runs under Gunicorn's native **`asgi` worker** (`config.asgi:application`). Streams are parked on the event loop while ordinary CRUD endpoints keep their synchronous execution (Django runs sync views in a thread-sensitive executor under ASGI). This is configured in `config/guniconf.py` and used by both the dev and production entrypoints — no separate server process is needed. + +### The data flow + +``` +publisher (Celery task / view) subscriber (browser, CLI) + │ │ + │ send_event(channel, "scan.progress", …) │ GET …/event-stream + ▼ ▼ + Valkey Pub/Sub ◄────────────────────► BaseSSEViewSet.list + (EVENTSTREAM_VALKEY_DB) → get_channels() (RLS-scoped) + → SSEChannelManager (tenant gate) + → StreamingHttpResponse (text/event-stream) +``` + +A publisher anywhere in the system (most often a Celery task) calls `send_event(channel, event_type, payload)`. `django-eventstream` fans it out over Valkey Pub/Sub to every connection subscribed to that channel. + +## Adding an SSE endpoint to your feature + +The example below streams progress for a long-running **scan**. Adapt the resource, prefix, and event names to your feature. + + + + + +Channels follow the format `::`, built only through `make_channel_name`. The prefix is owned by your feature and may contain hyphens but **never colons** (the parser splits on `:`). + +```python +CHANNEL_PREFIX = "scan-progress" +``` + +The tenant id is baked into every channel name. That is what lets the platform enforce cross-tenant isolation without knowing anything about your feature. + + + + + +Create the viewset for the SSE sub-resource. The only required method is `get_channels`; it runs inside the tenant transaction set up by the base class, so any database lookup inside it is automatically RLS-scoped. + +```python +# scans/event_streams.py +from api.sse import BaseSSEViewSet, make_channel_name +from django.shortcuts import get_object_or_404 +from scans.models import Scan + +CHANNEL_PREFIX = "scan-progress" + + +class ScanEventStreamViewSet(BaseSSEViewSet): + def get_queryset(self): + # RLS already scopes to the tenant; narrow further as needed + # (e.g. only scans the requesting user may see). + return Scan.objects.filter(tenant_id=self.request.tenant_id) + + def get_channels(self) -> set[str]: + scan = get_object_or_404(self.get_queryset(), pk=self.kwargs["scan_pk"]) + return {make_channel_name(CHANNEL_PREFIX, scan.tenant_id, scan.id)} +``` + + +`get_channels` **must raise** the relevant DRF exception (`NotFound`, `PermissionDenied`, `NotAuthenticated`) when authorization fails — `get_object_or_404` does this for you. Returning an empty set surfaces as django-eventstream's confusing "No channels specified" error instead of the real cause. + + + + + + +Mount the endpoint as an `event-stream` sub-resource. Keep it **outside the DRF router**, which would force the URL into a list/detail convention. Route the `get` method to the viewset's `list` action. + +```python +# scans/urls.py +path( + "scans//event-stream", + ScanEventStreamViewSet.as_view({"get": "list"}), + name="scan-event-stream", +), +``` + + + + + +A feature owns its event types in `//events.py`: one `publish_` function per event type, each body a **single** `send_event` call so the wire-level string lives in exactly one place. + +```python +# scans/events.py +from django_eventstream import send_event + + +def publish_progress(channel: str, checked: int, total: int) -> None: + send_event(channel, "scan.progress", {"checked": checked, "total": total}) + + +def publish_end(channel: str, scan_id: str) -> None: + # Terminal event carries the canonical id so reconnecting clients + # can refetch the persisted resource over REST. + send_event(channel, "scan.end", {"scan_id": scan_id}) + + +def publish_error(channel: str, code: str, detail: str) -> None: + send_event(channel, "scan.error", {"code": code, "detail": detail}) +``` + +There is no platform-side enum, registry, or dispatch table — **the naming convention is the contract** (see below). + + + + + +Wherever the work happens — usually a Celery task — build the channel the same way and publish: + +```python +from api.sse import make_channel_name +from scans.events import publish_progress, publish_end + +channel = make_channel_name("scan-progress", scan.tenant_id, scan.id) +publish_progress(channel, checked=42, total=100) +... +publish_end(channel, scan_id=str(scan.id)) +``` + + + + + +## Event naming convention + +Every event uses an event type of the form **`.`** (lowercased, dot-separated). The verb comes from this platform-wide vocabulary — if you need a verb that is not listed, document the addition in this guide so the catalog stays discoverable. + +| Verb | When to use | +|------|-------------| +| `delta` | An incremental piece of a stream the client concatenates (LLM text tokens, audio chunks). Standard term across OpenAI / Anthropic / LiteLLM / Vercel AI SDK. | +| `start` | Begin marker for a compound operation (e.g. a tool call whose execution will be reported by a matching `end`). | +| `end` | Terminal marker. Carries the canonical resource id so reconnecting clients can refetch persisted state via REST. | +| `progress` | Periodic checkpoint with quantifiable completion, e.g. `{"checked": 42, "total": 100}`. | +| `created` / `updated` / `deleted` | Resource-lifecycle events for cross-client sync streams. | +| `error` | Terminal failure. Carries a stable `code` for client switching and a human-readable `detail`. | + + +Payloads are **flat JSON**. The wire-level `event:` field already names the event type, so do **not** wrap the payload in `{"type": ..., "data": ...}`. Include the canonical resource UUID on terminal events so reconnecting clients can reconcile via REST. + + +## Authentication + +SSE endpoints use the same authentication stack as the rest of the API. Non-browser clients (CLI, programmatic) send the standard `Authorization` header — JWT or API key. + +Browser `EventSource` is the only widely available SSE client API and it **cannot set custom headers**. For that case only, the endpoint accepts a JWT via the `?access_token=` query parameter. The header always wins when present — a header is intentional, while a query parameter can leak into referers and logs, so it is consulted only as a fallback. + +```javascript +// Browser +const es = new EventSource( + `/api/v1/scans/${scanId}/event-stream?access_token=${jwt}` +); +``` + +```bash +# CLI / programmatic — header, exactly like every other endpoint +curl -N -H "Authorization: Bearer $JWT" \ + https:///api/v1/scans/$SCAN_ID/event-stream +``` + +## Tenant isolation & security model + +Authorization is enforced at two layers: + +1. **At connect**, `get_channels` runs under the regular DRF stack inside the tenant transaction (`rls_transaction`). Resource lookups are RLS-scoped, so a user cannot even resolve a channel for a resource they cannot see. Narrow the queryset further (e.g. `created_by=request.user`) when a resource is per-user within a tenant. +2. **After connect**, `SSEChannelManager.can_read_channel` re-verifies tenant membership by parsing the tenant id embedded in the channel name. Cross-tenant subscription is rejected even if a URL-level check ever has a bug. A malformed channel name is treated as "not authorized". + +Because the tenant id lives inside the channel name, this gate works for any feature without the platform knowing anything about it. + +## Reconnect & state recovery + +The platform deliberately ships **without server-side replay** (`is_channel_reliable` returns `False`). When a client reconnects, it does **not** receive missed events. Instead: + +- Terminal events (`*.end`) carry the canonical resource **UUID**. +- On reconnect, the client refetches the authoritative state from the normal REST endpoint using that id. + +Design your event payloads accordingly: deltas are ephemeral and concatenated in-flight; the durable truth always lives behind a REST resource. + +## Local development + +- The dev and production entrypoints both launch Gunicorn with the `asgi` worker (`config.asgi:application`). In dev, `DJANGO_DEBUG=True` enables hot reload; `preload_app` is automatically disabled under debug so edited code is picked up. +- SSE uses a **dedicated Valkey database** (`EVENTSTREAM_VALKEY_DB`, default `2`) kept separate from the Celery broker so a noisy broker cannot crowd out streaming traffic. It reuses the same `VALKEY_*` connection settings as the rest of the platform. + +| Env var | Default | Purpose | +|---------|---------|---------| +| `EVENTSTREAM_VALKEY_DB` | `2` | Valkey DB index for the SSE Pub/Sub bus | +| `DJANGO_WORKER_CLASS` | `asgi` | Gunicorn worker class | + +Test the stream end to end with `curl -N` (disable buffering) and an auth header: + +```bash +curl -N -H "Authorization: Bearer $JWT" \ + http://localhost:8080/api/v1/scans/$SCAN_ID/event-stream +``` + +## Testing + +The platform basis is covered by `api/tests/test_sse.py` (channel parsing, the tenant gate, and auth precedence). For a feature endpoint, test: + +- `get_channels` returns the expected channel for an authorized resource and raises `NotFound`/`PermissionDenied` otherwise. +- Each `publish_` helper emits the correct event type and flat payload (mock `send_event`). +- The producer builds the channel with `make_channel_name` using the resource's own `tenant_id`. diff --git a/docs/docs.json b/docs/docs.json index e31575878e..51c7e739b0 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -395,7 +395,8 @@ "developer-guide/lighthouse-architecture", "developer-guide/mcp-server", "developer-guide/ai-skills", - "developer-guide/prowler-studio" + "developer-guide/prowler-studio", + "developer-guide/server-sent-events" ] }, {