From 30d737c7d7ce99349c46b067b684614b4843d29e Mon Sep 17 00:00:00 2001 From: abdou Date: Mon, 22 Jun 2026 14:05:11 +0200 Subject: [PATCH] fix(api): bound Celery worker concurrency to a configurable default (#11075) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Adrián Jesús Peña Rodríguez --- api/.env.example | 3 ++ api/CHANGELOG.md | 1 + api/docs/orphan-task-recovery.md | 1 + .../backend/api/tests/test_celery_settings.py | 27 ++++++++++++++ api/src/backend/api/tests/test_views.py | 4 ++- api/src/backend/config/settings/celery.py | 5 +++ docs/troubleshooting.mdx | 35 +++++++++++++++++++ 7 files changed, 75 insertions(+), 1 deletion(-) diff --git a/api/.env.example b/api/.env.example index 0be5388790..f97d868890 100644 --- a/api/.env.example +++ b/api/.env.example @@ -24,6 +24,9 @@ DJANGO_THROTTLE_TOKEN_OBTAIN=50/minute # Decide whether to allow Django manage database table partitions DJANGO_MANAGE_DB_PARTITIONS=[True|False] DJANGO_CELERY_DEADLOCK_ATTEMPTS=5 +# Optional: bound Celery's prefork pool size. Unset → Celery uses os.cpu_count(). +# Useful on Kubernetes nodes with many CPUs where unbounded prefork balloons memory. +# DJANGO_CELERY_WORKER_CONCURRENCY=4 DJANGO_BROKER_VISIBILITY_TIMEOUT=86400 DJANGO_SENTRY_DSN= diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index 6073f71f0c..cb9d6fb179 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -9,6 +9,7 @@ All notable changes to the **Prowler API** are documented in this file. - Provider group filters for API endpoints that support cloud provider filtering, including exact and `__in` variants [(#11573)](https://github.com/prowler-cloud/prowler/pull/11573) - Provider filters for `GET /api/v1/compliance-overviews`, `/metadata`, and `/requirements`, using latest completed scans per matching provider [(#11587)](https://github.com/prowler-cloud/prowler/pull/11587) - Server-Sent Events (SSE) infrastructure for the API: a base viewset, a tenant-aware channel manager, and channel-name helpers backed by `django-eventstream` over Valkey Pub/Sub and served through the Gunicorn ASGI worker, so feature endpoints can stream events to clients over a single long-lived connection [(#11556)](https://github.com/prowler-cloud/prowler/pull/11556) +- `DJANGO_CELERY_WORKER_CONCURRENCY` to configure Celery workers concurrency. Unset for default behaviour [(#11075)](https://github.com/prowler-cloud/prowler/pull/11075) ### 🔄 Changed diff --git a/api/docs/orphan-task-recovery.md b/api/docs/orphan-task-recovery.md index a47b4f36a9..af4eefc1ac 100644 --- a/api/docs/orphan-task-recovery.md +++ b/api/docs/orphan-task-recovery.md @@ -65,6 +65,7 @@ All settings have safe defaults; override via environment variables. | Env var | Default | Purpose | | --- | --- | --- | | `DJANGO_CELERY_WORKER_PREFETCH_MULTIPLIER` | `1` | Tasks reserved per worker process. | +| `DJANGO_CELERY_WORKER_CONCURRENCY` | unset | Optional Celery prefork pool size. When unset, Celery uses its CPU-based default. Set this on worker containers to bound idle memory on hosts with many CPUs. | | `DJANGO_CELERY_WORKER_SOFT_SHUTDOWN_TIMEOUT` | `60` | Seconds the worker drains/re-queues on `SIGTERM` before force-kill. | | `DJANGO_CELERY_TASK_TIME_LIMIT` | `21600` (6h) | Hard limit for most tasks; connection checks are capped at 120s. | | `DJANGO_CELERY_TASK_SOFT_TIME_LIMIT` | hard - 600 | Soft limit; raises `SoftTimeLimitExceeded` for cleanup. | diff --git a/api/src/backend/api/tests/test_celery_settings.py b/api/src/backend/api/tests/test_celery_settings.py index d4010796ee..dcd8930edc 100644 --- a/api/src/backend/api/tests/test_celery_settings.py +++ b/api/src/backend/api/tests/test_celery_settings.py @@ -41,3 +41,30 @@ class TestBuildCeleryBrokerUrl: def test_invalid_scheme_raises_error(self): with pytest.raises(ValueError, match="Invalid VALKEY_SCHEME 'http'"): _build_celery_broker_url("http", "", "", "valkey", "6379", "0") + + +class TestCeleryWorkerConcurrency: + def _reimport_settings(self): + """Fresh import — importlib.reload() doesn't clear the module namespace, + so an attribute set by a prior test would leak into the unset case.""" + import sys + + sys.modules.pop("config.settings.celery", None) + import config.settings.celery as celery_settings + + return celery_settings + + def test_unset_leaves_setting_absent(self, monkeypatch): + monkeypatch.delenv("DJANGO_CELERY_WORKER_CONCURRENCY", raising=False) + mod = self._reimport_settings() + assert not hasattr(mod, "CELERY_WORKER_CONCURRENCY") + + def test_explicit_value_applied(self, monkeypatch): + monkeypatch.setenv("DJANGO_CELERY_WORKER_CONCURRENCY", "8") + mod = self._reimport_settings() + assert mod.CELERY_WORKER_CONCURRENCY == 8 + + def test_invalid_value_raises(self, monkeypatch): + monkeypatch.setenv("DJANGO_CELERY_WORKER_CONCURRENCY", "not-a-number") + with pytest.raises(ValueError): + self._reimport_settings() diff --git a/api/src/backend/api/tests/test_views.py b/api/src/backend/api/tests/test_views.py index 49234777da..613cef4755 100644 --- a/api/src/backend/api/tests/test_views.py +++ b/api/src/backend/api/tests/test_views.py @@ -13743,7 +13743,9 @@ class TestTenantFinishACSView: .filter(user=user, tenant=victim_tenant) .exists() ) - assert not SAMLToken.objects.using(MainRouter.admin_db).filter(user=user).exists() + assert ( + not SAMLToken.objects.using(MainRouter.admin_db).filter(user=user).exists() + ) def test_rollback_saml_user_when_error_occurs(self, users_fixture, monkeypatch): """Test that a user is properly deleted when created during SAML flow and an error occurs""" diff --git a/api/src/backend/config/settings/celery.py b/api/src/backend/config/settings/celery.py index b7030ebea4..b7105a548c 100644 --- a/api/src/backend/config/settings/celery.py +++ b/api/src/backend/config/settings/celery.py @@ -53,3 +53,8 @@ CELERY_TASK_TRACK_STARTED = True CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True CELERY_DEADLOCK_ATTEMPTS = env.int("DJANGO_CELERY_DEADLOCK_ATTEMPTS", default=5) + +# Opt-in override for Celery's prefork pool size. When unset, Celery falls back +# to its default (os.cpu_count()). +if "DJANGO_CELERY_WORKER_CONCURRENCY" in env.ENVIRON: + CELERY_WORKER_CONCURRENCY = env.int("DJANGO_CELERY_WORKER_CONCURRENCY") diff --git a/docs/troubleshooting.mdx b/docs/troubleshooting.mdx index 50cc43a3c0..3125d74ef5 100644 --- a/docs/troubleshooting.mdx +++ b/docs/troubleshooting.mdx @@ -2,6 +2,8 @@ title: 'Troubleshooting' --- +import { VersionBadge } from "/snippets/version-badge.mdx" + ## Running `prowler` I get `[File: utils.py:15] [Module: utils] CRITICAL: path/redacted: OSError[13]` That is an error related to file descriptors or opened files allowed by your operating system. @@ -81,6 +83,39 @@ docker compose down docker compose up -d ``` +### Worker Uses Too Much Memory on Hosts with Many CPUs + + + +When Prowler App runs self-hosted on a machine or Kubernetes node with many CPUs, +the Celery worker may create one prefork process per detected CPU if concurrency +is not configured explicitly. Each process loads the SDK runtime and cloud +provider clients, so idle memory can be high and worker containers can be +terminated by their memory limit. + +Set `DJANGO_CELERY_WORKER_CONCURRENCY` in the worker runtime environment to cap +the number of prefork processes: + +```yaml +services: + worker: + environment: + DJANGO_CELERY_WORKER_CONCURRENCY: "4" +``` + +For Kubernetes deployments, set the same variable on the worker Deployment: + +```yaml +env: + - name: DJANGO_CELERY_WORKER_CONCURRENCY + value: "4" +``` + +Lower values reduce idle memory and the number of tasks a worker can run in +parallel. Increase the value only when the worker has enough memory for the +expected scan workload. Leaving the variable unset preserves Celery's default +CPU-based concurrency. + ### API Container Fails to Start with JWT Key Permission Error See [GitHub Issue #8897](https://github.com/prowler-cloud/prowler/issues/8897) for more details.