mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
fix(api): bound Celery worker concurrency to a configurable default (#11075)
Co-authored-by: Adrián Jesús Peña Rodríguez <adrianjpr@gmail.com>
This commit is contained in:
@@ -24,6 +24,9 @@ DJANGO_THROTTLE_TOKEN_OBTAIN=50/minute
|
||||
# Decide whether to allow Django manage database table partitions
|
||||
DJANGO_MANAGE_DB_PARTITIONS=[True|False]
|
||||
DJANGO_CELERY_DEADLOCK_ATTEMPTS=5
|
||||
# Optional: bound Celery's prefork pool size. Unset → Celery uses os.cpu_count().
|
||||
# Useful on Kubernetes nodes with many CPUs where unbounded prefork balloons memory.
|
||||
# DJANGO_CELERY_WORKER_CONCURRENCY=4
|
||||
DJANGO_BROKER_VISIBILITY_TIMEOUT=86400
|
||||
DJANGO_SENTRY_DSN=
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
- Provider group filters for API endpoints that support cloud provider filtering, including exact and `__in` variants [(#11573)](https://github.com/prowler-cloud/prowler/pull/11573)
|
||||
- Provider filters for `GET /api/v1/compliance-overviews`, `/metadata`, and `/requirements`, using latest completed scans per matching provider [(#11587)](https://github.com/prowler-cloud/prowler/pull/11587)
|
||||
- Server-Sent Events (SSE) infrastructure for the API: a base viewset, a tenant-aware channel manager, and channel-name helpers backed by `django-eventstream` over Valkey Pub/Sub and served through the Gunicorn ASGI worker, so feature endpoints can stream events to clients over a single long-lived connection [(#11556)](https://github.com/prowler-cloud/prowler/pull/11556)
|
||||
- `DJANGO_CELERY_WORKER_CONCURRENCY` to configure Celery workers concurrency. Unset for default behaviour [(#11075)](https://github.com/prowler-cloud/prowler/pull/11075)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
|
||||
@@ -65,6 +65,7 @@ All settings have safe defaults; override via environment variables.
|
||||
| Env var | Default | Purpose |
|
||||
| --- | --- | --- |
|
||||
| `DJANGO_CELERY_WORKER_PREFETCH_MULTIPLIER` | `1` | Tasks reserved per worker process. |
|
||||
| `DJANGO_CELERY_WORKER_CONCURRENCY` | unset | Optional Celery prefork pool size. When unset, Celery uses its CPU-based default. Set this on worker containers to bound idle memory on hosts with many CPUs. |
|
||||
| `DJANGO_CELERY_WORKER_SOFT_SHUTDOWN_TIMEOUT` | `60` | Seconds the worker drains/re-queues on `SIGTERM` before force-kill. |
|
||||
| `DJANGO_CELERY_TASK_TIME_LIMIT` | `21600` (6h) | Hard limit for most tasks; connection checks are capped at 120s. |
|
||||
| `DJANGO_CELERY_TASK_SOFT_TIME_LIMIT` | hard - 600 | Soft limit; raises `SoftTimeLimitExceeded` for cleanup. |
|
||||
|
||||
@@ -41,3 +41,30 @@ class TestBuildCeleryBrokerUrl:
|
||||
def test_invalid_scheme_raises_error(self):
|
||||
with pytest.raises(ValueError, match="Invalid VALKEY_SCHEME 'http'"):
|
||||
_build_celery_broker_url("http", "", "", "valkey", "6379", "0")
|
||||
|
||||
|
||||
class TestCeleryWorkerConcurrency:
|
||||
def _reimport_settings(self):
|
||||
"""Fresh import — importlib.reload() doesn't clear the module namespace,
|
||||
so an attribute set by a prior test would leak into the unset case."""
|
||||
import sys
|
||||
|
||||
sys.modules.pop("config.settings.celery", None)
|
||||
import config.settings.celery as celery_settings
|
||||
|
||||
return celery_settings
|
||||
|
||||
def test_unset_leaves_setting_absent(self, monkeypatch):
|
||||
monkeypatch.delenv("DJANGO_CELERY_WORKER_CONCURRENCY", raising=False)
|
||||
mod = self._reimport_settings()
|
||||
assert not hasattr(mod, "CELERY_WORKER_CONCURRENCY")
|
||||
|
||||
def test_explicit_value_applied(self, monkeypatch):
|
||||
monkeypatch.setenv("DJANGO_CELERY_WORKER_CONCURRENCY", "8")
|
||||
mod = self._reimport_settings()
|
||||
assert mod.CELERY_WORKER_CONCURRENCY == 8
|
||||
|
||||
def test_invalid_value_raises(self, monkeypatch):
|
||||
monkeypatch.setenv("DJANGO_CELERY_WORKER_CONCURRENCY", "not-a-number")
|
||||
with pytest.raises(ValueError):
|
||||
self._reimport_settings()
|
||||
|
||||
@@ -13743,7 +13743,9 @@ class TestTenantFinishACSView:
|
||||
.filter(user=user, tenant=victim_tenant)
|
||||
.exists()
|
||||
)
|
||||
assert not SAMLToken.objects.using(MainRouter.admin_db).filter(user=user).exists()
|
||||
assert (
|
||||
not SAMLToken.objects.using(MainRouter.admin_db).filter(user=user).exists()
|
||||
)
|
||||
|
||||
def test_rollback_saml_user_when_error_occurs(self, users_fixture, monkeypatch):
|
||||
"""Test that a user is properly deleted when created during SAML flow and an error occurs"""
|
||||
|
||||
@@ -53,3 +53,8 @@ CELERY_TASK_TRACK_STARTED = True
|
||||
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
|
||||
|
||||
CELERY_DEADLOCK_ATTEMPTS = env.int("DJANGO_CELERY_DEADLOCK_ATTEMPTS", default=5)
|
||||
|
||||
# Opt-in override for Celery's prefork pool size. When unset, Celery falls back
|
||||
# to its default (os.cpu_count()).
|
||||
if "DJANGO_CELERY_WORKER_CONCURRENCY" in env.ENVIRON:
|
||||
CELERY_WORKER_CONCURRENCY = env.int("DJANGO_CELERY_WORKER_CONCURRENCY")
|
||||
|
||||
Reference in New Issue
Block a user