chore(skills): add comprehensive Celery documentation

- Add Task Composition (Canvas) section: chain, group, combined patterns
- Add Beat Scheduling section: IntervalSchedule, PeriodicTask, race conditions
- Add Advanced Task Patterns: bind=True, get_task_logger, SoftTimeLimitExceeded
- Document @set_tenant behavior (default vs keep_tenant=True)
- Add Celery Configuration section: broker settings, visibility_timeout
- Expand Celery queues table with all 7 queues
- Expand file-locations.md with Task Jobs details
- Add Celery Task Testing section to prowler-test-api (6 subsections)
This commit is contained in:
Adrián Jesús Peña Rodríguez
2026-01-21 13:53:40 +01:00
parent 98ef41fd7f
commit be52263934
3 changed files with 374 additions and 7 deletions

View File

@@ -249,9 +249,251 @@ def get_queryset(self):
| Queue | Purpose |
|-------|---------|
| `scans` | Prowler scan execution |
| `overview` | Dashboard aggregations |
| `overview` | Dashboard aggregations (severity, attack surface) |
| `compliance` | Compliance report generation |
| `integrations` | External integrations (Jira, etc.) |
| `integrations` | External integrations (Jira, S3, Security Hub) |
| `deletion` | Provider/tenant deletion (async) |
| `backfill` | Historical data backfill operations |
| `scan-reports` | Output generation (CSV, JSON, HTML, PDF) |
---
## Task Composition (Canvas)
Use Celery's Canvas primitives for complex workflows:
### Chain (Sequential)
```python
from celery import chain
# Tasks run sequentially: A → B → C
chain(
task_a.si(tenant_id=tenant_id),
task_b.si(tenant_id=tenant_id),
task_c.si(tenant_id=tenant_id),
).apply_async()
```
### Group (Parallel)
```python
from celery import group
# Tasks run in parallel: A, B, C simultaneously
group(
task_a.si(tenant_id=tenant_id),
task_b.si(tenant_id=tenant_id),
task_c.si(tenant_id=tenant_id),
).apply_async()
```
### Combined Patterns (Real Example)
```python
# From tasks/tasks.py - Post-scan workflow
chain(
perform_scan_summary_task.si(tenant_id=tenant_id, scan_id=scan_id),
group(
aggregate_daily_severity_task.si(tenant_id=tenant_id, scan_id=scan_id),
generate_outputs_task.si(scan_id=scan_id, provider_id=provider_id, tenant_id=tenant_id),
),
group(
generate_compliance_reports_task.si(tenant_id=tenant_id, scan_id=scan_id, provider_id=provider_id),
check_integrations_task.si(tenant_id=tenant_id, provider_id=provider_id, scan_id=scan_id),
),
).apply_async()
```
> **Note:** Use `.si()` (signature immutable) to prevent result passing between tasks. Use `.s()` if you need to pass results.
---
## Beat Scheduling (Periodic Tasks)
### Creating a Scheduled Task
```python
import json
from datetime import datetime, timedelta, timezone
from django_celery_beat.models import IntervalSchedule, PeriodicTask
# 1. Create or get the schedule
schedule, _ = IntervalSchedule.objects.get_or_create(
every=24,
period=IntervalSchedule.HOURS,
)
# 2. Create the periodic task
periodic_task = PeriodicTask.objects.create(
interval=schedule,
name=f"scan-perform-scheduled-{provider_id}", # Unique name
task="scan-perform-scheduled", # Task name (not function name)
kwargs=json.dumps({
"tenant_id": str(tenant_id),
"provider_id": str(provider_id),
}),
one_off=False,
start_time=datetime.now(timezone.utc) + timedelta(hours=24),
)
```
### Deleting a Scheduled Task
```python
PeriodicTask.objects.filter(name=f"scan-perform-scheduled-{provider_id}").delete()
```
### Avoiding Race Conditions
```python
# Use countdown to ensure DB transaction commits before task runs
perform_scheduled_scan_task.apply_async(
kwargs={"tenant_id": tenant_id, "provider_id": provider_id},
countdown=5, # Wait 5 seconds
)
```
---
## Advanced Task Patterns
### Accessing Task Metadata with `bind=True`
```python
@shared_task(base=RLSTask, bind=True, name="scan-perform-scheduled", queue="scans")
def perform_scheduled_scan_task(self, tenant_id: str, provider_id: str):
task_id = self.request.id # Current task ID
retries = self.request.retries # Number of retries so far
# Use task_id for tracking
scan_instance.task_id = task_id
scan_instance.save()
```
### Logging with `get_task_logger`
```python
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task(base=RLSTask, name="my-task")
@set_tenant
def my_task(provider_id: str):
# tenant_id is NOT in signature - @set_tenant pops it from kwargs
# but RLS context is already set by the decorator
logger.info(f"Processing provider {provider_id}")
logger.warning("Potential issue detected")
logger.error("Failed to process")
# Call with tenant_id in kwargs (decorator handles it)
my_task.delay(provider_id="...", tenant_id="...")
```
### Handling `SoftTimeLimitExceeded`
```python
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(
base=RLSTask,
soft_time_limit=300, # 5 minutes - raises SoftTimeLimitExceeded
time_limit=360, # 6 minutes - kills task (SIGKILL)
)
@set_tenant(keep_tenant=True) # keep_tenant=True to pass tenant_id to function
def long_running_task(tenant_id: str, scan_id: str):
try:
for batch in large_dataset:
process_batch(batch)
except SoftTimeLimitExceeded:
logger.warning(f"Task soft limit exceeded for scan {scan_id}, saving progress...")
save_partial_progress(scan_id)
raise # Re-raise to mark task as failed
```
### `@set_tenant` Behavior
| Mode | `tenant_id` in kwargs | `tenant_id` passed to function |
|------|----------------------|-------------------------------|
| `@set_tenant` (default) | Popped (removed) | NO - function doesn't receive it |
| `@set_tenant(keep_tenant=True)` | Read but kept | YES - function receives it |
```python
# Example: @set_tenant (default) - tenant_id NOT in function signature
@shared_task(base=RLSTask, name="provider-connection-check")
@set_tenant
def check_provider_connection_task(provider_id: str): # No tenant_id param
return check_provider_connection(provider_id=provider_id)
# Example: @set_tenant(keep_tenant=True) - tenant_id IN function signature
@shared_task(base=RLSTask, name="scan-report", queue="scan-reports")
@set_tenant(keep_tenant=True)
def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str): # Has tenant_id
# tenant_id available for use inside the function
pass
```
### Deferred Execution with `countdown` and `eta`
```python
# Execute after 30 seconds
my_task.apply_async(kwargs={...}, countdown=30)
# Execute at specific time
from datetime import datetime, timezone
my_task.apply_async(
kwargs={...},
eta=datetime(2024, 1, 15, 10, 0, tzinfo=timezone.utc)
)
```
---
## Celery Configuration
### Broker Settings (config/celery.py)
```python
from celery import Celery
celery_app = Celery("tasks")
celery_app.config_from_object("django.conf:settings", namespace="CELERY")
# Visibility timeout - CRITICAL for long-running tasks
# If task takes longer than this, broker assumes worker died and re-queues
BROKER_VISIBILITY_TIMEOUT = 86400 # 24 hours for scan tasks
celery_app.conf.broker_transport_options = {
"visibility_timeout": BROKER_VISIBILITY_TIMEOUT
}
celery_app.conf.result_backend_transport_options = {
"visibility_timeout": BROKER_VISIBILITY_TIMEOUT
}
# Result settings
celery_app.conf.update(
result_extended=True, # Store additional task metadata
result_expires=None, # Never expire results (we manage cleanup)
)
```
### Django Settings (config/settings/celery.py)
```python
CELERY_BROKER_URL = f"redis://{VALKEY_HOST}:{VALKEY_PORT}/{VALKEY_DB}"
CELERY_RESULT_BACKEND = "django-db" # Store results in PostgreSQL # trufflehog:ignore
CELERY_TASK_TRACK_STARTED = True # Track when tasks start
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
```
### Global Time Limits (Optional)
```python
# In settings.py - applies to ALL tasks
CELERY_TASK_SOFT_TIME_LIMIT = 3600 # 1 hour soft limit
CELERY_TASK_TIME_LIMIT = 3660 # 1 hour + 1 minute hard limit
```
---

View File

@@ -57,11 +57,26 @@
| Pattern | File Path | Key Classes/Functions |
|---------|-----------|----------------------|
| **Task Definitions** | `api/src/backend/tasks/tasks.py` | All `@shared_task` definitions |
| **RLS Task Base** | `api/src/backend/config/celery.py` | `RLSTask` base class |
| **RLS Task Base** | `api/src/backend/config/celery.py` | `RLSTask` base class (creates APITask on dispatch) |
| **Task Decorators** | `api/src/backend/api/decorators.py` | `@set_tenant`, `@handle_provider_deletion` |
| **Celery Config** | `api/src/backend/config/celery.py` | Celery app configuration |
| **Beat Schedule** | `api/src/backend/tasks/beat.py` | Periodic task scheduling |
| **Task Jobs** | `api/src/backend/tasks/jobs/` | `scan.py`, `deletion.py`, `backfill.py`, `export.py` |
| **Celery Config** | `api/src/backend/config/celery.py` | Celery app, broker settings, visibility timeout |
| **Django Settings** | `api/src/backend/config/settings/celery.py` | `CELERY_BROKER_URL`, `CELERY_RESULT_BACKEND` |
| **Beat Schedule** | `api/src/backend/tasks/beat.py` | `schedule_provider_scan()`, `PeriodicTask` creation |
| **Task Utilities** | `api/src/backend/tasks/utils.py` | `batched()`, `get_next_execution_datetime()` |
### Task Jobs (Business Logic)
| Job File | Purpose |
|----------|---------|
| `tasks/jobs/scan.py` | `perform_prowler_scan()`, `aggregate_findings()`, `aggregate_attack_surface()` |
| `tasks/jobs/deletion.py` | `delete_provider()`, `delete_tenant()` |
| `tasks/jobs/backfill.py` | Historical data backfill operations |
| `tasks/jobs/export.py` | Output file generation (CSV, JSON, HTML) |
| `tasks/jobs/report.py` | PDF report generation (ThreatScore, ENS, NIS2) |
| `tasks/jobs/connection.py` | Provider/integration connection checks |
| `tasks/jobs/integrations.py` | S3, Security Hub, Jira uploads |
| `tasks/jobs/muting.py` | Historical findings muting |
| `tasks/jobs/attack_paths/` | Attack paths scan (Neo4j/Cartography) |
## Key Line References

View File

@@ -98,7 +98,9 @@ def test_cross_tenant_access_denied(self, authenticated_client, tenants_fixture)
---
## 4. Async Task Testing
## 4. Celery Task Testing
### 4.1 Testing Views That Trigger Tasks
**Mock BOTH `.delay()` AND `Task.objects.get`**:
@@ -113,6 +115,114 @@ def test_async_delete(self, mock_task, mock_task_get, authenticated_client, prov
response = authenticated_client.delete(reverse("provider-detail", kwargs={"pk": provider.id}))
assert response.status_code == status.HTTP_202_ACCEPTED
mock_task.assert_called_once()
```
### 4.2 Testing Task Logic Directly
Use `apply()` for synchronous execution without Celery worker:
```python
@pytest.mark.django_db
def test_task_logic_directly(self, tenants_fixture, providers_fixture):
tenant = tenants_fixture[0]
provider = providers_fixture[0]
# Execute task synchronously (no broker needed)
result = check_provider_connection_task.apply(
kwargs={"tenant_id": str(tenant.id), "provider_id": str(provider.id)}
)
assert result.successful()
assert result.result["connected"] is True
```
### 4.3 Testing Canvas (chain/group)
Mock the entire chain to verify task orchestration:
```python
@patch("tasks.tasks.chain")
@patch("tasks.tasks.group")
def test_post_scan_workflow(self, mock_group, mock_chain, tenants_fixture):
tenant = tenants_fixture[0]
# Mock chain.apply_async
mock_chain_instance = Mock()
mock_chain.return_value = mock_chain_instance
_perform_scan_complete_tasks(str(tenant.id), "scan-123", "provider-456")
# Verify chain was called
assert mock_chain.called
mock_chain_instance.apply_async.assert_called()
```
### 4.4 Why NOT to Use `task_always_eager`
> **Warning:** `CELERY_TASK_ALWAYS_EAGER = True` is NOT recommended for testing.
| Problem | Impact |
|---------|--------|
| No actual task serialization | Misses argument type errors |
| No broker interaction | Hides connection issues |
| Different execution context | `self.request` behaves differently |
| Results not stored by default | `task.result` returns `None` |
**Instead, use:**
- `task.apply()` for synchronous execution
- Mocking for isolation
- `pytest-celery` for integration tests
### 4.5 Testing Tasks with `@set_tenant`
The `@set_tenant` decorator pops `tenant_id` from kwargs (unless `keep_tenant=True`).
```python
from unittest.mock import patch, Mock
from tasks.tasks import check_provider_connection_task
@pytest.mark.django_db
class TestSetTenantDecorator:
@patch("api.decorators.connection")
def test_sets_rls_context(self, mock_conn, tenants_fixture, providers_fixture):
tenant = tenants_fixture[0]
provider = providers_fixture[0]
# Call task with tenant_id - decorator sets RLS and pops it
check_provider_connection_task.apply(
kwargs={"tenant_id": str(tenant.id), "provider_id": str(provider.id)}
)
# Verify SET_CONFIG_QUERY was executed
mock_conn.cursor.return_value.__enter__.return_value.execute.assert_called()
```
### 4.6 Testing Beat Scheduled Tasks
```python
from unittest.mock import patch, Mock
from django_celery_beat.models import PeriodicTask
from tasks.beat import schedule_provider_scan
@pytest.mark.django_db
class TestBeatScheduling:
@patch("tasks.beat.perform_scheduled_scan_task.apply_async")
def test_schedule_provider_scan(self, mock_apply, providers_fixture):
provider = providers_fixture[0]
mock_apply.return_value = Mock(id="task-123")
schedule_provider_scan(provider)
# Verify periodic task created
assert PeriodicTask.objects.filter(
name=f"scan-perform-scheduled-{provider.id}"
).exists()
# Verify immediate execution with countdown
mock_apply.assert_called_once()
call_kwargs = mock_apply.call_args
assert call_kwargs.kwargs.get("countdown") == 5
```
---