mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
242 lines
12 KiB
Plaintext
242 lines
12 KiB
Plaintext
---
|
|
title: 'Server-Sent Events (SSE)'
|
|
---
|
|
|
|
import { VersionBadge } from "/snippets/version-badge.mdx"
|
|
|
|
<VersionBadge version="1.32.0" />
|
|
|
|
This guide explains how to add a **Server-Sent Events (SSE)** endpoint to the Prowler API. SSE lets the backend push a one-way stream of events to a client over a single long-lived HTTP connection — ideal for live progress, token-by-token LLM output, or any "the server has news for you" use case where the client should not poll.
|
|
|
|
<Info>
|
|
The platform ships the SSE **infrastructure** (`api.sse`) and wiring. No feature endpoint streams over SSE out of the box — this guide shows how to build one on top of the shared base.
|
|
</Info>
|
|
|
|
## When to use SSE
|
|
|
|
| Need | Use |
|
|
|------|-----|
|
|
| Server pushes incremental updates, client only reads | **SSE** |
|
|
| Bidirectional, low-latency messaging (chat both ways, games) | WebSocket |
|
|
| Client asks, server answers once | Plain REST |
|
|
|
|
SSE is the right tool when the **client only consumes**: scan progress, long-running job checkpoints, streamed LLM tokens, cross-client resource-sync notifications. It rides on plain HTTP, reconnects automatically in the browser via the native [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) API, and needs no extra protocol.
|
|
|
|
## How it works
|
|
|
|
SSE is wired through [`django-eventstream`](https://github.com/fanout/django_eventstream) and a small platform layer in `api/src/backend/api/sse/`:
|
|
|
|
| Piece | File | Responsibility |
|
|
|-------|------|----------------|
|
|
| `BaseSSEViewSet` | `api/sse/base_views.py` | Base DRF viewset a feature subclasses. The feature implements `get_channels`; the base handles auth, the tenant transaction, and delegates streaming to `django-eventstream`. |
|
|
| `SSEChannelManager` | `api/sse/channelmanager.py` | Registered in `settings.EVENTSTREAM_CHANNELMANAGER_CLASS`. Reads the channel set off the request and enforces the platform-wide tenant gate. |
|
|
| `SSEAuthentication` | `api/authentication.py` | Same JWT/API-key stack as the rest of the API, plus an `?access_token=<jwt>` fallback for browser `EventSource` clients. Lives with the other authentication classes, not in the `sse` package. |
|
|
| `make_channel_name` / `tenant_id_from_channel` | `api/sse/utils.py` | Single source of truth for the channel-name format, so publishers and the channel manager agree byte-for-byte. |
|
|
| Settings | `config/settings/eventstream.py` | Valkey Pub/Sub backend (dedicated DB), channel manager, allowed headers. |
|
|
|
|
### Transport: the server runs on ASGI
|
|
|
|
SSE connections are long-lived. Holding one open per synchronous worker would exhaust the worker pool, so the API runs under Gunicorn's native **`asgi` worker** (`config.asgi:application`). Streams are parked on the event loop while ordinary CRUD endpoints keep their synchronous execution (Django runs sync views in a thread-sensitive executor under ASGI). This is configured in `config/guniconf.py` and used by both the dev and production entrypoints — no separate server process is needed.
|
|
|
|
### The data flow
|
|
|
|
```
|
|
publisher (Celery task / view) subscriber (browser, CLI)
|
|
│ │
|
|
│ send_event(channel, "scan.progress", …) │ GET …/event-stream
|
|
▼ ▼
|
|
Valkey Pub/Sub ◄────────────────────► BaseSSEViewSet.list
|
|
(EVENTSTREAM_VALKEY_DB) → get_channels() (RLS-scoped)
|
|
→ SSEChannelManager (tenant gate)
|
|
→ StreamingHttpResponse (text/event-stream)
|
|
```
|
|
|
|
A publisher anywhere in the system (most often a Celery task) calls `send_event(channel, event_type, payload)`. `django-eventstream` fans it out over Valkey Pub/Sub to every connection subscribed to that channel.
|
|
|
|
## Adding an SSE endpoint to your feature
|
|
|
|
The example below streams progress for a long-running **scan**. Adapt the resource, prefix, and event names to your feature.
|
|
|
|
<Steps>
|
|
|
|
<Step title="Pick a channel prefix">
|
|
|
|
Channels follow the format `<prefix>:<tenant_id>:<resource_id>`, built only through `make_channel_name`. The prefix is owned by your feature and may contain hyphens but **never colons** (the parser splits on `:`).
|
|
|
|
```python
|
|
CHANNEL_PREFIX = "scan-progress"
|
|
```
|
|
|
|
The tenant id is baked into every channel name. That is what lets the platform enforce cross-tenant isolation without knowing anything about your feature.
|
|
|
|
</Step>
|
|
|
|
<Step title="Subclass BaseSSEViewSet">
|
|
|
|
Create the viewset for the SSE sub-resource. The only required method is `get_channels`; it runs inside the tenant transaction set up by the base class, so any database lookup inside it is automatically RLS-scoped.
|
|
|
|
```python
|
|
# scans/event_streams.py
|
|
from api.sse import BaseSSEViewSet, make_channel_name
|
|
from django.shortcuts import get_object_or_404
|
|
from scans.models import Scan
|
|
|
|
CHANNEL_PREFIX = "scan-progress"
|
|
|
|
|
|
class ScanEventStreamViewSet(BaseSSEViewSet):
|
|
def get_queryset(self):
|
|
# RLS already scopes to the tenant; narrow further as needed
|
|
# (e.g. only scans the requesting user may see).
|
|
return Scan.objects.filter(tenant_id=self.request.tenant_id)
|
|
|
|
def get_channels(self) -> set[str]:
|
|
scan = get_object_or_404(self.get_queryset(), pk=self.kwargs["scan_pk"])
|
|
return {make_channel_name(CHANNEL_PREFIX, scan.tenant_id, scan.id)}
|
|
```
|
|
|
|
<Warning>
|
|
`get_channels` **must raise** the relevant DRF exception (`NotFound`, `PermissionDenied`, `NotAuthenticated`) when authorization fails — `get_object_or_404` does this for you. Returning an empty set surfaces as django-eventstream's confusing "No channels specified" error instead of the real cause.
|
|
</Warning>
|
|
|
|
</Step>
|
|
|
|
<Step title="Wire the URL as a sub-resource">
|
|
|
|
Mount the endpoint as an `event-stream` sub-resource. Keep it **outside the DRF router**, which would force the URL into a list/detail convention. Route the `get` method to the viewset's `list` action.
|
|
|
|
```python
|
|
# scans/urls.py
|
|
path(
|
|
"scans/<uuid:scan_pk>/event-stream",
|
|
ScanEventStreamViewSet.as_view({"get": "list"}),
|
|
name="scan-event-stream",
|
|
),
|
|
```
|
|
|
|
</Step>
|
|
|
|
<Step title="Define your event vocabulary">
|
|
|
|
A feature owns its event types in `<app>/<domain>/events.py`: one `publish_<event>` function per event type, each body a **single** `send_event` call so the wire-level string lives in exactly one place.
|
|
|
|
```python
|
|
# scans/events.py
|
|
from django_eventstream import send_event
|
|
|
|
|
|
def publish_progress(channel: str, checked: int, total: int) -> None:
|
|
send_event(channel, "scan.progress", {"checked": checked, "total": total})
|
|
|
|
|
|
def publish_end(channel: str, scan_id: str) -> None:
|
|
# Terminal event carries the canonical id so reconnecting clients
|
|
# can refetch the persisted resource over REST.
|
|
send_event(channel, "scan.end", {"scan_id": scan_id})
|
|
|
|
|
|
def publish_error(channel: str, code: str, detail: str) -> None:
|
|
send_event(channel, "scan.error", {"code": code, "detail": detail})
|
|
```
|
|
|
|
There is no platform-side enum, registry, or dispatch table — **the naming convention is the contract** (see below).
|
|
|
|
</Step>
|
|
|
|
<Step title="Publish from the producer">
|
|
|
|
Wherever the work happens — usually a Celery task — build the channel the same way and publish:
|
|
|
|
```python
|
|
from api.sse import make_channel_name
|
|
from scans.events import publish_progress, publish_end
|
|
|
|
channel = make_channel_name("scan-progress", scan.tenant_id, scan.id)
|
|
publish_progress(channel, checked=42, total=100)
|
|
...
|
|
publish_end(channel, scan_id=str(scan.id))
|
|
```
|
|
|
|
</Step>
|
|
|
|
</Steps>
|
|
|
|
## Event naming convention
|
|
|
|
Every event uses an event type of the form **`<resource>.<verb>`** (lowercased, dot-separated). The verb comes from this platform-wide vocabulary — if you need a verb that is not listed, document the addition in this guide so the catalog stays discoverable.
|
|
|
|
| Verb | When to use |
|
|
|------|-------------|
|
|
| `delta` | An incremental piece of a stream the client concatenates (LLM text tokens, audio chunks). Standard term across OpenAI / Anthropic / LiteLLM / Vercel AI SDK. |
|
|
| `start` | Begin marker for a compound operation (e.g. a tool call whose execution will be reported by a matching `end`). |
|
|
| `end` | Terminal marker. Carries the canonical resource id so reconnecting clients can refetch persisted state via REST. |
|
|
| `progress` | Periodic checkpoint with quantifiable completion, e.g. `{"checked": 42, "total": 100}`. |
|
|
| `created` / `updated` / `deleted` | Resource-lifecycle events for cross-client sync streams. |
|
|
| `error` | Terminal failure. Carries a stable `code` for client switching and a human-readable `detail`. |
|
|
|
|
<Note>
|
|
Payloads are **flat JSON**. The wire-level `event:` field already names the event type, so do **not** wrap the payload in `{"type": ..., "data": ...}`. Include the canonical resource UUID on terminal events so reconnecting clients can reconcile via REST.
|
|
</Note>
|
|
|
|
## Authentication
|
|
|
|
SSE endpoints use the same authentication stack as the rest of the API. Non-browser clients (CLI, programmatic) send the standard `Authorization` header — JWT or API key.
|
|
|
|
Browser `EventSource` is the only widely available SSE client API and it **cannot set custom headers**. For that case only, the endpoint accepts a JWT via the `?access_token=<jwt>` query parameter. The header always wins when present — a header is intentional, while a query parameter can leak into referers and logs, so it is consulted only as a fallback.
|
|
|
|
```javascript
|
|
// Browser
|
|
const es = new EventSource(
|
|
`/api/v1/scans/${scanId}/event-stream?access_token=${jwt}`
|
|
);
|
|
```
|
|
|
|
```bash
|
|
# CLI / programmatic — header, exactly like every other endpoint
|
|
curl -N -H "Authorization: Bearer $JWT" \
|
|
https://<host>/api/v1/scans/$SCAN_ID/event-stream
|
|
```
|
|
|
|
## Tenant isolation & security model
|
|
|
|
Authorization is enforced at two layers:
|
|
|
|
1. **At connect**, `get_channels` runs under the regular DRF stack inside the tenant transaction (`rls_transaction`). Resource lookups are RLS-scoped, so a user cannot even resolve a channel for a resource they cannot see. Narrow the queryset further (e.g. `created_by=request.user`) when a resource is per-user within a tenant.
|
|
2. **After connect**, `SSEChannelManager.can_read_channel` re-verifies tenant membership by parsing the tenant id embedded in the channel name. Cross-tenant subscription is rejected even if a URL-level check ever has a bug. A malformed channel name is treated as "not authorized".
|
|
|
|
Because the tenant id lives inside the channel name, this gate works for any feature without the platform knowing anything about it.
|
|
|
|
## Reconnect & state recovery
|
|
|
|
The platform deliberately ships **without server-side replay** (`is_channel_reliable` returns `False`). When a client reconnects, it does **not** receive missed events. Instead:
|
|
|
|
- Terminal events (`*.end`) carry the canonical resource **UUID**.
|
|
- On reconnect, the client refetches the authoritative state from the normal REST endpoint using that id.
|
|
|
|
Design your event payloads accordingly: deltas are ephemeral and concatenated in-flight; the durable truth always lives behind a REST resource.
|
|
|
|
## Local development
|
|
|
|
- The dev and production entrypoints both launch Gunicorn with the `asgi` worker (`config.asgi:application`). In dev, `DJANGO_DEBUG=True` enables hot reload; `preload_app` is automatically disabled under debug so edited code is picked up.
|
|
- SSE uses a **dedicated Valkey database** (`EVENTSTREAM_VALKEY_DB`, default `2`) kept separate from the Celery broker so a noisy broker cannot crowd out streaming traffic. It reuses the same `VALKEY_*` connection settings as the rest of the platform.
|
|
|
|
| Env var | Default | Purpose |
|
|
|---------|---------|---------|
|
|
| `EVENTSTREAM_VALKEY_DB` | `2` | Valkey DB index for the SSE Pub/Sub bus |
|
|
| `DJANGO_WORKER_CLASS` | `asgi` | Gunicorn worker class |
|
|
|
|
Test the stream end to end with `curl -N` (disable buffering) and an auth header:
|
|
|
|
```bash
|
|
curl -N -H "Authorization: Bearer $JWT" \
|
|
http://localhost:8080/api/v1/scans/$SCAN_ID/event-stream
|
|
```
|
|
|
|
## Testing
|
|
|
|
The platform basis is covered by `api/tests/test_sse.py` (channel parsing, the tenant gate, and auth precedence). For a feature endpoint, test:
|
|
|
|
- `get_channels` returns the expected channel for an authorized resource and raises `NotFound`/`PermissionDenied` otherwise.
|
|
- Each `publish_<event>` helper emits the correct event type and flat payload (mock `send_event`).
|
|
- The producer builds the channel with `make_channel_name` using the resource's own `tenant_id`.
|