mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
feat(api): add Server-Sent Events (SSE) infrastructure (#11556)
This commit is contained in:
committed by
GitHub
parent
eeb02453d1
commit
28c064a9b7
@@ -4,6 +4,10 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
|
||||
## [1.32.0] (Prowler UNRELEASED)
|
||||
|
||||
### 🚀 Added
|
||||
|
||||
- Server-Sent Events (SSE) infrastructure for the API: a base viewset, a tenant-aware channel manager, and channel-name helpers backed by `django-eventstream` over Valkey Pub/Sub and served through the Gunicorn ASGI worker, so feature endpoints can stream events to clients over a single long-lived connection [(#11556)](https://github.com/prowler-cloud/prowler/pull/11556)
|
||||
|
||||
### 🔐 Security
|
||||
|
||||
- `aiohttp` to 3.14.0 and `idna` to 3.15, patching known CVEs [(#11596)](https://github.com/prowler-cloud/prowler/pull/11596)
|
||||
|
||||
@@ -21,13 +21,19 @@ apply_fixtures() {
|
||||
}
|
||||
|
||||
start_dev_server() {
|
||||
echo "Starting the development server..."
|
||||
exec uv run python manage.py runserver 0.0.0.0:"${DJANGO_PORT:-8080}"
|
||||
echo "Starting the development server (Gunicorn ASGI, debug + reload)..."
|
||||
# Same server/worker as prod (config.asgi via the native `asgi` worker), so
|
||||
# SSE streams run on the event loop exactly as they do in production. DEBUG is
|
||||
# on so guniconf's `reload = DEBUG` hot-reloads edited code (and flips
|
||||
# `preload_app` off so reload actually takes).
|
||||
export DJANGO_DEBUG="${DJANGO_DEBUG:-True}"
|
||||
export DJANGO_BIND_ADDRESS="${DJANGO_BIND_ADDRESS:-0.0.0.0}"
|
||||
exec uv run gunicorn -c config/guniconf.py config.asgi:application
|
||||
}
|
||||
|
||||
start_prod_server() {
|
||||
echo "Starting the Gunicorn server..."
|
||||
exec uv run gunicorn -c config/guniconf.py config.wsgi:application
|
||||
exec uv run gunicorn -c config/guniconf.py config.asgi:application
|
||||
}
|
||||
|
||||
resolve_worker_hostname() {
|
||||
|
||||
+4
-2
@@ -41,7 +41,8 @@ dependencies = [
|
||||
"drf-spectacular==0.27.2",
|
||||
"drf-spectacular-jsonapi==0.5.1",
|
||||
"defusedxml==0.7.1",
|
||||
"gunicorn==23.0.0",
|
||||
"django-eventstream==5.3.3",
|
||||
"gunicorn==26.0.0",
|
||||
"lxml==6.1.0",
|
||||
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
|
||||
"psycopg2-binary==2.9.9",
|
||||
@@ -209,6 +210,7 @@ constraint-dependencies = [
|
||||
"django-celery-results==2.6.0",
|
||||
"django-cors-headers==4.4.0",
|
||||
"django-environ==0.11.2",
|
||||
"django-eventstream==5.3.3",
|
||||
"django-filter==24.3",
|
||||
"django-guid==3.5.0",
|
||||
"django-postgres-extra==2.0.9",
|
||||
@@ -253,7 +255,7 @@ constraint-dependencies = [
|
||||
"grpc-google-iam-v1==0.14.3",
|
||||
"grpcio==1.76.0",
|
||||
"grpcio-status==1.76.0",
|
||||
"gunicorn==23.0.0",
|
||||
"gunicorn==26.0.0",
|
||||
"h11==0.16.0",
|
||||
"h2==4.3.0",
|
||||
"hpack==4.1.0",
|
||||
|
||||
@@ -93,3 +93,30 @@ class CombinedJWTOrAPIKeyAuthentication(BaseAuthentication):
|
||||
|
||||
# Default fallback
|
||||
return self.jwt_auth.authenticate(request)
|
||||
|
||||
|
||||
class SSEAuthentication(CombinedJWTOrAPIKeyAuthentication):
|
||||
"""JWT/API-Key auth that also accepts `?access_token=<jwt>`.
|
||||
|
||||
Browser `EventSource` is the only widely available SSE client API
|
||||
and it cannot set the `Authorization` header (its constructor takes
|
||||
only a URL and `withCredentials`). To keep browser SSE clients on
|
||||
the same auth stack as the rest of the API, SSE endpoints additionally
|
||||
accept a JWT via the `?access_token=<jwt>` query parameter — the
|
||||
standard parameter name defined in RFC 6750 Section 2.3 for bearer tokens.
|
||||
"""
|
||||
|
||||
def authenticate(self, request: Request):
|
||||
auth_header = request.headers.get("Authorization", "")
|
||||
if auth_header:
|
||||
return super().authenticate(request)
|
||||
|
||||
raw_token = request.query_params.get("access_token")
|
||||
if not raw_token:
|
||||
# No header and no query token — let the default path raise
|
||||
# the canonical AuthenticationFailed via the parent class.
|
||||
return super().authenticate(request)
|
||||
|
||||
validated_token = self.jwt_auth.get_validated_token(raw_token)
|
||||
user = self.jwt_auth.get_user(validated_token)
|
||||
return user, validated_token
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
"""Platform Server-Sent Events (SSE) infrastructure.
|
||||
|
||||
Wires `django-eventstream` into the API: a base viewset features
|
||||
subclass to expose an SSE endpoint
|
||||
(:class:`api.sse.base_views.BaseSSEViewSet`), the channel manager that
|
||||
enforces the tenant gate (:class:`api.sse.channelmanager.SSEChannelManager`),
|
||||
and the channel-name helpers (:func:`api.sse.utils.make_channel_name`).
|
||||
"""
|
||||
|
||||
from api.sse.utils import make_channel_name
|
||||
from api.sse.base_views import BaseSSEViewSet
|
||||
|
||||
__all__ = ["BaseSSEViewSet", "make_channel_name"]
|
||||
@@ -0,0 +1,46 @@
|
||||
"""Base view class for SSE endpoints."""
|
||||
|
||||
from api.authentication import SSEAuthentication
|
||||
from api.base_views import BaseRLSViewSet
|
||||
from django_eventstream.renderers import SSEEventRenderer
|
||||
from django_eventstream.views import events
|
||||
|
||||
|
||||
class BaseSSEViewSet(BaseRLSViewSet):
|
||||
"""Base class for platform SSE endpoints.
|
||||
|
||||
Subclasses override method `get_channels` to declare the channel
|
||||
names the connection should subscribe to — the same way a regular
|
||||
DRF viewset overrides method `get_queryset`. The channel manager
|
||||
reads the result from `request.sse_channels`; there is no other
|
||||
coupling between platform and feature.
|
||||
"""
|
||||
|
||||
authentication_classes = [SSEAuthentication]
|
||||
# Pin the SSE renderer so content negotiation accepts the browser's
|
||||
# `Accept: text/event-stream`.
|
||||
renderer_classes = [SSEEventRenderer]
|
||||
|
||||
def get_channels(self) -> set[str]:
|
||||
"""Return the channels this connection subscribes to.
|
||||
|
||||
Implementations MUST raise the relevant DRF exceptions
|
||||
(`NotAuthenticated`, `PermissionDenied`, `NotFound`) when
|
||||
authorization fails. Returning an empty set would surface as
|
||||
django-eventstream's "No channels specified" which masks the
|
||||
real cause.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def get_queryset(self):
|
||||
# Most SSE viewsets only need `get_channels` and never call
|
||||
# `get_queryset` (the SSE list path bypasses serialization
|
||||
# entirely). Subclasses that perform their own queryset lookup
|
||||
# inside `get_channels` should override; the default raises
|
||||
# the same error a missing override on a ModelViewSet would.
|
||||
raise NotImplementedError
|
||||
|
||||
def list(self, request, *_args, **kwargs):
|
||||
"""Resolve channels under the regular DRF stack and stream."""
|
||||
request.sse_channels = self.get_channels()
|
||||
return events(request, **kwargs)
|
||||
@@ -0,0 +1,75 @@
|
||||
"""Channel manager that wires `django-eventstream` to platform SSE views."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
from uuid import UUID
|
||||
|
||||
from django_eventstream.channelmanager import DefaultChannelManager
|
||||
from rest_framework.request import Request
|
||||
|
||||
from api.sse.utils import tenant_id_from_channel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from api.models import User
|
||||
|
||||
|
||||
class SSEChannelManager(DefaultChannelManager):
|
||||
"""Connect `django-eventstream` to the platform's SSE viewsets."""
|
||||
|
||||
def get_channels_for_request(self, request: Request, view_kwargs: dict) -> set[str]: # noqa: vulture
|
||||
"""Return the request's channels scoped to the active JWT tenant.
|
||||
|
||||
Args:
|
||||
request: The authenticated DRF request, carrying `tenant_id`
|
||||
(set by `BaseRLSViewSet`) and `sse_channels` (set by
|
||||
`BaseSSEViewSet.list`).
|
||||
view_kwargs: URL keyword arguments from django-eventstream;
|
||||
unused because channels are resolved on the request.
|
||||
|
||||
Returns:
|
||||
The subset of `request.sse_channels` whose embedded tenant
|
||||
matches the active request tenant.
|
||||
"""
|
||||
try:
|
||||
request_tenant_id = UUID(str(getattr(request, "tenant_id", None)))
|
||||
except (TypeError, ValueError):
|
||||
return set()
|
||||
return {
|
||||
channel
|
||||
for channel in getattr(request, "sse_channels", set())
|
||||
if tenant_id_from_channel(channel) == request_tenant_id
|
||||
}
|
||||
|
||||
def can_read_channel(self, user: "User | None", channel: str) -> bool:
|
||||
"""Re-verify tenant membership once the stream is established.
|
||||
|
||||
Args:
|
||||
user: The connection's authenticated `User`, or `None` for an
|
||||
anonymous connection — django-eventstream passes `None`
|
||||
rather than an `AnonymousUser`.
|
||||
channel: The channel name being read, in the canonical
|
||||
`<prefix>:<tenant_id>:<resource_id>` format.
|
||||
|
||||
Returns:
|
||||
`True` only when `user` is authenticated and a member of the
|
||||
tenant embedded in `channel`; `False` otherwise, including for
|
||||
anonymous connections and malformed channel names.
|
||||
"""
|
||||
if user is None or not user.is_authenticated:
|
||||
return False
|
||||
tenant_id = tenant_id_from_channel(channel)
|
||||
if tenant_id is None:
|
||||
return False
|
||||
return user.is_member_of_tenant(tenant_id)
|
||||
|
||||
def is_channel_reliable(self, channel: str) -> bool:
|
||||
"""Report whether the channel keeps a server-side replay buffer.
|
||||
|
||||
Args:
|
||||
channel: The channel name being queried.
|
||||
|
||||
Returns:
|
||||
`False`, unconditionally. Replay storage is not configured
|
||||
"""
|
||||
return False
|
||||
@@ -0,0 +1,51 @@
|
||||
"""Channel-name convention shared by SSE publishers, consumers, and the
|
||||
channel manager. The format is `<prefix>:<tenant_id>:<resource_id>`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
|
||||
CHANNEL_SEPARATOR = ":"
|
||||
|
||||
|
||||
def make_channel_name(
|
||||
prefix: str,
|
||||
tenant_id: str | uuid.UUID,
|
||||
resource_id: str | uuid.UUID,
|
||||
) -> str:
|
||||
"""Build the canonical channel name for a resource.
|
||||
|
||||
Args:
|
||||
prefix: Feature-owned prefix (e.g. `"lighthouse-session"`).
|
||||
tenant_id: Tenant the resource belongs to.
|
||||
resource_id: Resource identifier within the tenant.
|
||||
|
||||
Raises:
|
||||
ValueError: If any segment contains `CHANNEL_SEPARATOR`, which
|
||||
would break the `<prefix>:<tenant_id>:<resource_id>` contract
|
||||
and let a crafted name smuggle extra segments past the parser.
|
||||
"""
|
||||
segments = (str(prefix), str(tenant_id), str(resource_id))
|
||||
if any(CHANNEL_SEPARATOR in segment for segment in segments):
|
||||
raise ValueError(
|
||||
f"Channel segments must not contain '{CHANNEL_SEPARATOR}': {segments!r}"
|
||||
)
|
||||
return CHANNEL_SEPARATOR.join(segments)
|
||||
|
||||
|
||||
def tenant_id_from_channel(channel: str) -> uuid.UUID | None:
|
||||
"""Return the tenant UUID embedded in *channel*, or `None` if
|
||||
*channel* does not follow the platform convention.
|
||||
|
||||
A `None` result MUST be treated by callers as "not authorized" or
|
||||
a malformed channel cannot be safely read.
|
||||
"""
|
||||
segments = channel.split(CHANNEL_SEPARATOR)
|
||||
if len(segments) != 3:
|
||||
# Reject non-canonical names
|
||||
return None
|
||||
try:
|
||||
return uuid.UUID(segments[1])
|
||||
except ValueError:
|
||||
return None
|
||||
@@ -1,13 +1,13 @@
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from django.test import RequestFactory
|
||||
from rest_framework.exceptions import AuthenticationFailed
|
||||
|
||||
from api.authentication import TenantAPIKeyAuthentication
|
||||
from api.authentication import SSEAuthentication, TenantAPIKeyAuthentication
|
||||
from api.db_router import MainRouter
|
||||
from api.models import TenantAPIKey
|
||||
|
||||
@@ -382,3 +382,62 @@ class TestTenantAPIKeyAuthentication:
|
||||
auth_backend.authenticate(request)
|
||||
|
||||
assert str(exc_info.value.detail) == "API Key has already expired."
|
||||
|
||||
|
||||
class TestSSEAuthentication:
|
||||
"""`SSEAuthentication` adds an `?access_token=<jwt>` fallback for
|
||||
browser `EventSource` clients while keeping the standard
|
||||
`Authorization` header as the authoritative source."""
|
||||
|
||||
def test_header_present_delegates_to_super(self):
|
||||
request = MagicMock()
|
||||
request.headers = {"Authorization": "Bearer header-token"}
|
||||
with patch.object(
|
||||
SSEAuthentication.__bases__[0], "authenticate", return_value=("user", "tok")
|
||||
) as super_auth:
|
||||
result = SSEAuthentication().authenticate(request)
|
||||
super_auth.assert_called_once_with(request)
|
||||
assert result == ("user", "tok")
|
||||
|
||||
def test_no_header_no_query_token_delegates_to_super(self):
|
||||
request = MagicMock()
|
||||
request.headers = {}
|
||||
request.query_params = {}
|
||||
with patch.object(
|
||||
SSEAuthentication.__bases__[0], "authenticate", return_value=None
|
||||
) as super_auth:
|
||||
result = SSEAuthentication().authenticate(request)
|
||||
super_auth.assert_called_once_with(request)
|
||||
assert result is None
|
||||
|
||||
def test_query_token_used_only_as_fallback(self):
|
||||
request = MagicMock()
|
||||
request.headers = {}
|
||||
request.query_params = {"access_token": "query-jwt"}
|
||||
|
||||
jwt_instance = MagicMock()
|
||||
jwt_instance.get_validated_token.return_value = "validated"
|
||||
jwt_instance.get_user.return_value = "query-user"
|
||||
|
||||
with patch.object(SSEAuthentication, "jwt_auth", jwt_instance):
|
||||
user, token = SSEAuthentication().authenticate(request)
|
||||
|
||||
jwt_instance.get_validated_token.assert_called_once_with("query-jwt")
|
||||
assert user == "query-user"
|
||||
assert token == "validated"
|
||||
|
||||
def test_query_token_invalid_raises_authentication_failed(self):
|
||||
request = MagicMock()
|
||||
request.headers = {}
|
||||
request.query_params = {"access_token": "bad-token"}
|
||||
|
||||
jwt_instance = MagicMock()
|
||||
jwt_instance.get_validated_token.side_effect = AuthenticationFailed(
|
||||
"Invalid token"
|
||||
)
|
||||
|
||||
with patch.object(SSEAuthentication, "jwt_auth", jwt_instance):
|
||||
with pytest.raises(AuthenticationFailed):
|
||||
SSEAuthentication().authenticate(request)
|
||||
|
||||
jwt_instance.get_validated_token.assert_called_once_with("bad-token")
|
||||
|
||||
@@ -0,0 +1,191 @@
|
||||
"""Tests for the platform SSE infrastructure (``api.sse``).
|
||||
|
||||
Cover the two security-critical platform pieces — the channel-name
|
||||
convention (:mod:`api.sse.utils`) and the tenant gate enforced by
|
||||
:class:`api.sse.channelmanager.SSEChannelManager`. The SSE authentication
|
||||
class lives in :mod:`api.authentication` with the rest of the auth stack,
|
||||
so its tests live in ``test_authentication.py``. Per-feature SSE endpoints
|
||||
add their own tests on top of these.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
from django.http import StreamingHttpResponse
|
||||
from rest_framework.test import APIRequestFactory, force_authenticate
|
||||
|
||||
from api.sse.base_views import BaseSSEViewSet
|
||||
from api.sse.channelmanager import SSEChannelManager
|
||||
from api.sse.utils import make_channel_name, tenant_id_from_channel
|
||||
|
||||
|
||||
class TestMakeChannel:
|
||||
def test_round_trips_tenant_id(self):
|
||||
tenant_id = uuid.uuid4()
|
||||
channel = make_channel_name("lighthouse-session", tenant_id, uuid.uuid4())
|
||||
assert tenant_id_from_channel(channel) == tenant_id
|
||||
|
||||
def test_accepts_str_arguments(self):
|
||||
tenant_id = uuid.uuid4()
|
||||
channel = make_channel_name("lighthouse-session", str(tenant_id), "resource-1")
|
||||
assert channel == f"lighthouse-session:{tenant_id}:resource-1"
|
||||
|
||||
def test_prefix_with_hyphen_is_not_split(self):
|
||||
# Prefixes contain hyphens but never colons, so the tenant id is
|
||||
# always the second colon-separated segment.
|
||||
tenant_id = uuid.uuid4()
|
||||
channel = make_channel_name("a-long-hyphenated-prefix", tenant_id, "res")
|
||||
assert tenant_id_from_channel(channel) == tenant_id
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"prefix, tenant_id, resource_id",
|
||||
[
|
||||
("evil:prefix", uuid.uuid4(), "res"),
|
||||
("prefix", uuid.uuid4(), "res:extra"),
|
||||
("prefix", "tenant:smuggled", "res"),
|
||||
],
|
||||
)
|
||||
def test_rejects_separator_injection(self, prefix, tenant_id, resource_id):
|
||||
# A colon in any segment would let a crafted name smuggle extra
|
||||
# segments past the parser, so construction must fail loudly.
|
||||
with pytest.raises(ValueError):
|
||||
make_channel_name(prefix, tenant_id, resource_id)
|
||||
|
||||
|
||||
class TestTenantIdFromChannel:
|
||||
def test_returns_none_for_too_few_segments(self):
|
||||
assert tenant_id_from_channel("prefix:only") is None
|
||||
assert tenant_id_from_channel("garbage") is None
|
||||
|
||||
def test_returns_none_for_too_many_segments(self):
|
||||
# A valid tenant UUID in position 1 must not authorize a
|
||||
# non-canonical name that carries extra segments.
|
||||
tenant_id = uuid.uuid4()
|
||||
assert tenant_id_from_channel(f"prefix:{tenant_id}:resource:extra") is None
|
||||
|
||||
def test_returns_none_for_non_uuid_tenant_segment(self):
|
||||
assert tenant_id_from_channel("prefix:not-a-uuid:resource") is None
|
||||
|
||||
def test_parses_valid_channel(self):
|
||||
tenant_id = uuid.uuid4()
|
||||
assert tenant_id_from_channel(f"prefix:{tenant_id}:resource") == tenant_id
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestSSEChannelManager:
|
||||
def test_member_can_read_own_tenant_channel(
|
||||
self, create_test_user, tenants_fixture
|
||||
):
|
||||
tenant = tenants_fixture[0]
|
||||
channel = make_channel_name("lighthouse-session", tenant.id, uuid.uuid4())
|
||||
assert SSEChannelManager().can_read_channel(create_test_user, channel)
|
||||
|
||||
def test_non_member_cannot_read_other_tenant_channel(
|
||||
self, create_test_user, tenants_fixture
|
||||
):
|
||||
# create_test_user is a member of tenant1 and tenant2 but not tenant3.
|
||||
foreign_tenant = tenants_fixture[2]
|
||||
channel = make_channel_name(
|
||||
"lighthouse-session", foreign_tenant.id, uuid.uuid4()
|
||||
)
|
||||
assert not SSEChannelManager().can_read_channel(create_test_user, channel)
|
||||
|
||||
def test_anonymous_user_is_rejected(self, tenants_fixture):
|
||||
channel = make_channel_name(
|
||||
"lighthouse-session", tenants_fixture[0].id, uuid.uuid4()
|
||||
)
|
||||
assert not SSEChannelManager().can_read_channel(None, channel)
|
||||
|
||||
anon = MagicMock(is_authenticated=False)
|
||||
assert not SSEChannelManager().can_read_channel(anon, channel)
|
||||
|
||||
def test_malformed_channel_is_rejected(self, create_test_user, tenants_fixture):
|
||||
assert not SSEChannelManager().can_read_channel(create_test_user, "garbage")
|
||||
|
||||
def test_get_channels_for_request_returns_active_tenant_channels(self):
|
||||
tenant_id = uuid.uuid4()
|
||||
own = make_channel_name("prefix", tenant_id, "resource")
|
||||
request = MagicMock()
|
||||
request.tenant_id = str(tenant_id)
|
||||
request.sse_channels = {own}
|
||||
assert SSEChannelManager().get_channels_for_request(request, {}) == {own}
|
||||
|
||||
def test_get_channels_for_request_drops_other_tenant_channels(self):
|
||||
# Fail-closed: a channel for a tenant other than the active JWT
|
||||
# tenant is dropped before reaching django-eventstream, even if the
|
||||
# viewset mistakenly stashed it. This is the primary tenant gate that
|
||||
# binds authorization to request.tenant_id, not just membership.
|
||||
active_tenant = uuid.uuid4()
|
||||
own = make_channel_name("prefix", active_tenant, "resource")
|
||||
foreign = make_channel_name("prefix", uuid.uuid4(), "resource")
|
||||
request = MagicMock()
|
||||
request.tenant_id = str(active_tenant)
|
||||
request.sse_channels = {own, foreign}
|
||||
assert SSEChannelManager().get_channels_for_request(request, {}) == {own}
|
||||
|
||||
def test_get_channels_for_request_drops_malformed_channels(self):
|
||||
request = MagicMock()
|
||||
request.tenant_id = str(uuid.uuid4())
|
||||
request.sse_channels = {"garbage", "prefix:not-a-uuid:resource"}
|
||||
assert SSEChannelManager().get_channels_for_request(request, {}) == set()
|
||||
|
||||
def test_get_channels_for_request_without_tenant_returns_empty(self):
|
||||
# No active tenant on the request (auth/RLS never ran) → fail closed,
|
||||
# regardless of any channels stashed on it.
|
||||
request = MagicMock(spec=[])
|
||||
assert SSEChannelManager().get_channels_for_request(request, {}) == set()
|
||||
|
||||
def test_get_channels_for_request_defaults_to_empty(self):
|
||||
# A request that never went through BaseSSEViewSet.list has no
|
||||
# sse_channels attribute; the manager must not raise.
|
||||
request = object()
|
||||
assert SSEChannelManager().get_channels_for_request(request, {}) == set()
|
||||
|
||||
def test_channel_is_not_reliable(self):
|
||||
# v1 ships without server-side replay storage.
|
||||
assert (
|
||||
SSEChannelManager().is_channel_reliable("prefix:tenant:resource") is False
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestBaseSSEViewSet:
|
||||
"""End-to-end check that the base viewset opens a stream.
|
||||
|
||||
``BaseSSEViewSet.list`` hands the DRF ``Request`` straight to
|
||||
django-eventstream's ``events()``, which is written for a plain
|
||||
Django request. This drives a real request through the full DRF
|
||||
stack (authentication, RLS, content negotiation, channel manager)
|
||||
and asserts the result is an SSE stream, so the DRF/Django request
|
||||
mismatch cannot regress silently.
|
||||
"""
|
||||
|
||||
def test_list_opens_event_stream(self, create_test_user, tenants_fixture):
|
||||
tenant = tenants_fixture[0]
|
||||
channel = make_channel_name("test-sse", tenant.id, uuid.uuid4())
|
||||
seen_tenant_ids = []
|
||||
|
||||
class _StreamingSSEViewSet(BaseSSEViewSet):
|
||||
def get_channels(self):
|
||||
# Reached only after dispatch/initial ran, so the RLS
|
||||
# tenant context is already on the request.
|
||||
seen_tenant_ids.append(self.request.tenant_id)
|
||||
return {channel}
|
||||
|
||||
request = APIRequestFactory().get("/api/v1/test-sse/stream")
|
||||
force_authenticate(
|
||||
request, user=create_test_user, token={"tenant_id": str(tenant.id)}
|
||||
)
|
||||
|
||||
view = _StreamingSSEViewSet.as_view({"get": "list"})
|
||||
response = view(request)
|
||||
|
||||
# A StreamingHttpResponse (not the plain HttpResponse used for SSE
|
||||
# error envelopes) means events() accepted the DRF request, the
|
||||
# channel manager handed it a non-empty channel set, and the
|
||||
# stream was opened end to end.
|
||||
assert isinstance(response, StreamingHttpResponse)
|
||||
assert response.status_code == 200
|
||||
assert response["Content-Type"] == "text/event-stream"
|
||||
assert seen_tenant_ids == [str(tenant.id)]
|
||||
@@ -3,6 +3,7 @@ from datetime import timedelta
|
||||
from config.custom_logging import LOGGING # noqa
|
||||
from config.env import BASE_DIR, env # noqa
|
||||
from config.settings.celery import * # noqa
|
||||
from config.settings.eventstream import * # noqa
|
||||
from config.settings.partitions import * # noqa
|
||||
from config.settings.sentry import * # noqa
|
||||
from config.settings.social_login import * # noqa
|
||||
@@ -44,6 +45,7 @@ INSTALLED_APPS = [
|
||||
"dj_rest_auth.registration",
|
||||
"rest_framework.authtoken",
|
||||
"drf_simple_apikey",
|
||||
"django_eventstream",
|
||||
]
|
||||
|
||||
MIDDLEWARE = [
|
||||
@@ -136,6 +138,7 @@ SPECTACULAR_SETTINGS = {
|
||||
}
|
||||
|
||||
WSGI_APPLICATION = "config.wsgi.application"
|
||||
ASGI_APPLICATION = "config.asgi.application"
|
||||
|
||||
DJANGO_GUID = {
|
||||
"GUID_HEADER_NAME": "Transaction-ID",
|
||||
|
||||
@@ -25,6 +25,15 @@ bind = f"{BIND_ADDRESS}:{PORT}"
|
||||
workers = env.int("DJANGO_WORKERS", default=multiprocessing.cpu_count() * 2 + 1)
|
||||
reload = DEBUG
|
||||
|
||||
# Native ASGI worker (gunicorn 24+). Required so SSE endpoints can keep the
|
||||
# event loop alive while waiting for events.
|
||||
worker_class = env("DJANGO_WORKER_CLASS", default="asgi")
|
||||
|
||||
# Preload the application before forking workers in production: the app is
|
||||
# imported once in the master and workers fork from it. In development, disable
|
||||
# preload so the server restarts on code changes.
|
||||
preload_app = not DEBUG
|
||||
|
||||
# Logging
|
||||
logconfig_dict = DJANGO_LOGGERS
|
||||
gunicorn_logger = logging.getLogger(BackendLogger.GUNICORN)
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
"""Server-Sent Events (SSE) configuration.
|
||||
|
||||
Wires django-eventstream into the platform: Valkey Pub/Sub backend on a
|
||||
dedicated DB (separate from the Celery broker), the platform channel
|
||||
manager, and headers that match the existing CORS allowlist.
|
||||
"""
|
||||
|
||||
from config.env import env
|
||||
from config.settings.celery import (
|
||||
VALKEY_HOST,
|
||||
VALKEY_PASSWORD,
|
||||
VALKEY_PORT,
|
||||
VALKEY_SCHEME,
|
||||
VALKEY_USERNAME,
|
||||
)
|
||||
|
||||
# Dedicated Valkey DB for the SSE Pub/Sub bus. Kept distinct from the
|
||||
# Celery broker DB so a noisy broker can't shoulder out streaming
|
||||
# traffic on the same keyspace.
|
||||
EVENTSTREAM_VALKEY_DB = env.int("EVENTSTREAM_VALKEY_DB", default=2)
|
||||
|
||||
EVENTSTREAM_REDIS: dict = {
|
||||
"host": VALKEY_HOST,
|
||||
"port": int(VALKEY_PORT),
|
||||
"db": EVENTSTREAM_VALKEY_DB,
|
||||
}
|
||||
if VALKEY_PASSWORD:
|
||||
EVENTSTREAM_REDIS["password"] = VALKEY_PASSWORD
|
||||
if VALKEY_USERNAME:
|
||||
EVENTSTREAM_REDIS["username"] = VALKEY_USERNAME
|
||||
if VALKEY_SCHEME == "rediss":
|
||||
EVENTSTREAM_REDIS["ssl"] = True
|
||||
|
||||
# Platform channel manager — performs the per-feature authorization and
|
||||
# rewrites the placeholder channel from the URL into the canonical
|
||||
# tenant-scoped channel name. See ``api.sse.channelmanager``.
|
||||
EVENTSTREAM_CHANNELMANAGER_CLASS = "api.sse.channelmanager.SSEChannelManager"
|
||||
|
||||
# Headers a browser EventSource may legitimately send. Keep tight; the
|
||||
# stream itself reads no body, so no permissive defaults.
|
||||
EVENTSTREAM_ALLOW_HEADERS = "Cache-Control, Last-Event-ID"
|
||||
Generated
+55
-5
@@ -146,6 +146,7 @@ constraints = [
|
||||
{ name = "django-celery-results", specifier = "==2.6.0" },
|
||||
{ name = "django-cors-headers", specifier = "==4.4.0" },
|
||||
{ name = "django-environ", specifier = "==0.11.2" },
|
||||
{ name = "django-eventstream", specifier = "==5.3.3" },
|
||||
{ name = "django-filter", specifier = "==24.3" },
|
||||
{ name = "django-guid", specifier = "==3.5.0" },
|
||||
{ name = "django-postgres-extra", specifier = "==2.0.9" },
|
||||
@@ -190,7 +191,7 @@ constraints = [
|
||||
{ name = "grpc-google-iam-v1", specifier = "==0.14.3" },
|
||||
{ name = "grpcio", specifier = "==1.76.0" },
|
||||
{ name = "grpcio-status", specifier = "==1.76.0" },
|
||||
{ name = "gunicorn", specifier = "==23.0.0" },
|
||||
{ name = "gunicorn", specifier = "==26.0.0" },
|
||||
{ name = "h11", specifier = "==0.16.0" },
|
||||
{ name = "h2", specifier = "==4.3.0" },
|
||||
{ name = "hpack", specifier = "==4.1.0" },
|
||||
@@ -2356,6 +2357,19 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c4/f1/468b49cccba3b42dda571063a14c668bb0b53a1d5712426d18e36663bd53/django_environ-0.11.2-py2.py3-none-any.whl", hash = "sha256:0ff95ab4344bfeff693836aa978e6840abef2e2f1145adff7735892711590c05", size = 19141, upload-time = "2023-09-01T21:02:59.88Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "django-eventstream"
|
||||
version = "5.3.3"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "django" },
|
||||
{ name = "django-grip" },
|
||||
{ name = "gripcontrol" },
|
||||
{ name = "pyjwt", extra = ["crypto"] },
|
||||
{ name = "six" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/f2/49/ec6cbc24f3f30465370df7096cfea9722bad2b0c1f35a7ff5d45fb96cff6/django_eventstream-5.3.3.tar.gz", hash = "sha256:6880b03298eebf18c1b736b972fb862eaf631dfbb79f8b27496418a3495d08dc", size = 47622, upload-time = "2025-10-23T00:22:40.291Z" }
|
||||
|
||||
[[package]]
|
||||
name = "django-filter"
|
||||
version = "24.3"
|
||||
@@ -2368,6 +2382,19 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/09/b1/92f1c30b47c1ebf510c35a2ccad9448f73437e5891bbd2b4febe357cc3de/django_filter-24.3-py3-none-any.whl", hash = "sha256:c4852822928ce17fb699bcfccd644b3574f1a2d80aeb2b4ff4f16b02dd49dc64", size = 95011, upload-time = "2024-08-02T13:27:55.616Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "django-grip"
|
||||
version = "3.5.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "django" },
|
||||
{ name = "gripcontrol" },
|
||||
{ name = "pubcontrol" },
|
||||
{ name = "six" },
|
||||
{ name = "werkzeug" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/cb/d0/2c7b04fa864073cd8cb324f8674672162282d97540d56732cbd3a9ae5bca/django-grip-3.5.2.tar.gz", hash = "sha256:1ee1601492cd110256bd03e4a68797a9fbefa27c15f5a838bf245df97db0450c", size = 7626, upload-time = "2025-03-24T18:53:58.677Z" }
|
||||
|
||||
[[package]]
|
||||
name = "django-guid"
|
||||
version = "3.5.0"
|
||||
@@ -2979,6 +3006,17 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c8/ab/717c58343cf02c5265b531384b248787e04d8160b8afe53d9eec053d7b44/greenlet-3.3.1-cp312-cp312-win_arm64.whl", hash = "sha256:bfb2d1763d777de5ee495c85309460f6fd8146e50ec9d0ae0183dbf6f0a829d1", size = 226403, upload-time = "2026-01-23T15:31:39.372Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gripcontrol"
|
||||
version = "4.4.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "pubcontrol" },
|
||||
{ name = "pyjwt", extra = ["crypto"] },
|
||||
{ name = "six" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/4f/51/1cbf88384fbe97a1454fb0adddcdca8cb90ceb99c3250274c334db844f4f/gripcontrol-4.4.0.tar.gz", hash = "sha256:44ee6fe244a02870aa4e5bc810138ccaf5070dce5eb149b8ee9e27b960a95c2d", size = 12526, upload-time = "2026-05-14T21:19:28.49Z" }
|
||||
|
||||
[[package]]
|
||||
name = "grpc-google-iam-v1"
|
||||
version = "0.14.3"
|
||||
@@ -3040,14 +3078,14 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "gunicorn"
|
||||
version = "23.0.0"
|
||||
version = "26.0.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "packaging" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/34/72/9614c465dc206155d93eff0ca20d42e1e35afc533971379482de953521a4/gunicorn-23.0.0.tar.gz", hash = "sha256:f014447a0101dc57e294f6c18ca6b40227a4c90e9bdb586042628030cba004ec", size = 375031, upload-time = "2024-08-10T20:25:27.378Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/6d/b7/a4a3f632f823e432ce6bc65f62961b7980c898c77f075a2f7118cb3846fe/gunicorn-26.0.0.tar.gz", hash = "sha256:ca9346f85e3a4aeeb64d491045c16b9a35647abd37ea15efe53080eb8b090baf", size = 727286, upload-time = "2026-05-05T06:38:25.529Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/cb/7d/6dac2a6e1eba33ee43f318edbed4ff29151a49b5d37f080aad1e6469bca4/gunicorn-23.0.0-py3-none-any.whl", hash = "sha256:ec400d38950de4dfd418cff8328b2c8faed0edb0d517d3394e457c317908ca4d", size = 85029, upload-time = "2024-08-10T20:25:24.996Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/e6/40/9c2384fc2be4ad25dd4a49decd5ad9ea5a3639814c11bd40ab77cb9f0a14/gunicorn-26.0.0-py3-none-any.whl", hash = "sha256:40233d26a5f0d1872916188c276e21641155111c2853f0c2cd55260aec0d24fc", size = 212009, upload-time = "2026-05-05T06:38:23.007Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4510,6 +4548,7 @@ dependencies = [
|
||||
{ name = "django-celery-results" },
|
||||
{ name = "django-cors-headers" },
|
||||
{ name = "django-environ" },
|
||||
{ name = "django-eventstream" },
|
||||
{ name = "django-filter" },
|
||||
{ name = "django-guid" },
|
||||
{ name = "django-postgres-extra" },
|
||||
@@ -4574,6 +4613,7 @@ requires-dist = [
|
||||
{ name = "django-celery-results", specifier = "==2.6.0" },
|
||||
{ name = "django-cors-headers", specifier = "==4.4.0" },
|
||||
{ name = "django-environ", specifier = "==0.11.2" },
|
||||
{ name = "django-eventstream", specifier = "==5.3.3" },
|
||||
{ name = "django-filter", specifier = "==24.3" },
|
||||
{ name = "django-guid", specifier = "==3.5.0" },
|
||||
{ name = "django-postgres-extra", specifier = "==2.0.9" },
|
||||
@@ -4586,7 +4626,7 @@ requires-dist = [
|
||||
{ name = "drf-spectacular-jsonapi", specifier = "==0.5.1" },
|
||||
{ name = "fonttools", specifier = "==4.62.1" },
|
||||
{ name = "gevent", specifier = "==25.9.1" },
|
||||
{ name = "gunicorn", specifier = "==23.0.0" },
|
||||
{ name = "gunicorn", specifier = "==26.0.0" },
|
||||
{ name = "h2", specifier = "==4.3.0" },
|
||||
{ name = "lxml", specifier = "==6.1.0" },
|
||||
{ name = "markdown", specifier = "==3.10.2" },
|
||||
@@ -4674,6 +4714,16 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/7b/08/9c66c269b0d417a0af9fb969535f0371b8c538633535a7a6a5ca3f9231e2/psycopg2_binary-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:81ff62668af011f9a48787564ab7eded4e9fb17a4a6a74af5ffa6a457400d2ab", size = 1163864, upload-time = "2023-10-28T09:37:28.155Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pubcontrol"
|
||||
version = "3.5.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "pyjwt", extra = ["crypto"] },
|
||||
{ name = "requests" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/25/6a/02202a247214a6ffd5148ab1b16aca1c334b40dca211bca0442c8b7c7447/pubcontrol-3.5.0.tar.gz", hash = "sha256:a5ec6b3f53edfd005675518e5e4cc23b34122776835ae7c6dbd1db173d1ff0cb", size = 18199, upload-time = "2023-07-05T19:11:40.477Z" }
|
||||
|
||||
[[package]]
|
||||
name = "py-deviceid"
|
||||
version = "0.1.1"
|
||||
|
||||
Reference in New Issue
Block a user