feat(tasks): create overview queue for summaries and overviews (#8214)

This commit is contained in:
Víctor Fernández Poyatos
2025-07-08 13:53:23 +02:00
committed by GitHub
parent 7de9a37edb
commit 3fb0733887
4 changed files with 60 additions and 28 deletions

View File

@@ -11,6 +11,9 @@ All notable changes to the **Prowler API** are documented in this file.
## [v1.9.1] (Prowler v5.8.1)
### Changed
- Summary and overview tasks now use a dedicated queue and no longer propagate errors to compliance tasks [(#8214)](https://github.com/prowler-cloud/prowler/pull/8214)
### Fixed
- Scan with no resources will not trigger legacy code for findings metadata [(#8183)](https://github.com/prowler-cloud/prowler/pull/8183)
- Invitation email comparison case-insensitive [(#8206)](https://github.com/prowler-cloud/prowler/pull/8206)

View File

@@ -32,7 +32,7 @@ start_prod_server() {
start_worker() {
echo "Starting the worker..."
poetry run python -m celery -A config.celery worker -l "${DJANGO_LOGGING_LEVEL:-info}" -Q celery,scans,scan-reports,deletion,backfill -E --max-tasks-per-child 1
poetry run python -m celery -A config.celery worker -l "${DJANGO_LOGGING_LEVEL:-info}" -Q celery,scans,scan-reports,deletion,backfill,overview -E --max-tasks-per-child 1
}
start_worker_beat() {

View File

@@ -37,6 +37,26 @@ from prowler.lib.outputs.finding import Finding as FindingOutput
logger = get_task_logger(__name__)
def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str):
"""
Helper function to perform tasks after a scan is completed.
Args:
tenant_id (str): The tenant ID under which the scan was performed.
scan_id (str): The ID of the scan that was performed.
provider_id (str): The primary key of the Provider instance that was scanned.
"""
create_compliance_requirements_task.apply_async(
kwargs={"tenant_id": tenant_id, "scan_id": scan_id}
)
chain(
perform_scan_summary_task.si(tenant_id=tenant_id, scan_id=scan_id),
generate_outputs_task.si(
scan_id=scan_id, provider_id=provider_id, tenant_id=tenant_id
),
).apply_async()
@shared_task(base=RLSTask, name="provider-connection-check")
@set_tenant
def check_provider_connection_task(provider_id: str):
@@ -103,13 +123,7 @@ def perform_scan_task(
checks_to_execute=checks_to_execute,
)
chain(
perform_scan_summary_task.si(tenant_id, scan_id),
create_compliance_requirements_task.si(tenant_id=tenant_id, scan_id=scan_id),
generate_outputs.si(
scan_id=scan_id, provider_id=provider_id, tenant_id=tenant_id
),
).apply_async()
_perform_scan_complete_tasks(tenant_id, scan_id, provider_id)
return result
@@ -214,20 +228,12 @@ def perform_scheduled_scan_task(self, tenant_id: str, provider_id: str):
scheduler_task_id=periodic_task_instance.id,
)
chain(
perform_scan_summary_task.si(tenant_id, scan_instance.id),
create_compliance_requirements_task.si(
tenant_id=tenant_id, scan_id=str(scan_instance.id)
),
generate_outputs.si(
scan_id=str(scan_instance.id), provider_id=provider_id, tenant_id=tenant_id
),
).apply_async()
_perform_scan_complete_tasks(tenant_id, str(scan_instance.id), provider_id)
return result
@shared_task(name="scan-summary")
@shared_task(name="scan-summary", queue="overview")
def perform_scan_summary_task(tenant_id: str, scan_id: str):
return aggregate_findings(tenant_id=tenant_id, scan_id=scan_id)
@@ -243,7 +249,7 @@ def delete_tenant_task(tenant_id: str):
queue="scan-reports",
)
@set_tenant(keep_tenant=True)
def generate_outputs(scan_id: str, provider_id: str, tenant_id: str):
def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str):
"""
Process findings in batches and generate output files in multiple formats.
@@ -381,7 +387,7 @@ def backfill_scan_resource_summaries_task(tenant_id: str, scan_id: str):
return backfill_resource_scan_summaries(tenant_id=tenant_id, scan_id=scan_id)
@shared_task(base=RLSTask, name="scan-compliance-overviews")
@shared_task(base=RLSTask, name="scan-compliance-overviews", queue="overview")
def create_compliance_requirements_task(tenant_id: str, scan_id: str):
"""
Creates detailed compliance requirement records for a scan.

View File

@@ -3,9 +3,10 @@ from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from tasks.tasks import generate_outputs
from tasks.tasks import _perform_scan_complete_tasks, generate_outputs_task
# TODO Move this to outputs/reports jobs
@pytest.mark.django_db
class TestGenerateOutputs:
def setup_method(self):
@@ -17,7 +18,7 @@ class TestGenerateOutputs:
with patch("tasks.tasks.ScanSummary.objects.filter") as mock_filter:
mock_filter.return_value.exists.return_value = False
result = generate_outputs(
result = generate_outputs_task(
scan_id=self.scan_id,
provider_id=self.provider_id,
tenant_id=self.tenant_id,
@@ -99,7 +100,7 @@ class TestGenerateOutputs:
mock_compress.return_value = "/tmp/zipped.zip"
mock_upload.return_value = "s3://bucket/zipped.zip"
result = generate_outputs(
result = generate_outputs_task(
scan_id=self.scan_id,
provider_id=self.provider_id,
tenant_id=self.tenant_id,
@@ -150,7 +151,7 @@ class TestGenerateOutputs:
True,
]
result = generate_outputs(
result = generate_outputs_task(
scan_id="scan",
provider_id="provider",
tenant_id=self.tenant_id,
@@ -208,7 +209,7 @@ class TestGenerateOutputs:
{"aws": [(lambda x: True, MagicMock())]},
),
):
generate_outputs(
generate_outputs_task(
scan_id=self.scan_id,
provider_id=self.provider_id,
tenant_id=self.tenant_id,
@@ -276,7 +277,7 @@ class TestGenerateOutputs:
}
},
):
result = generate_outputs(
result = generate_outputs_task(
scan_id=self.scan_id,
provider_id=self.provider_id,
tenant_id=self.tenant_id,
@@ -346,7 +347,7 @@ class TestGenerateOutputs:
):
mock_summary.return_value.exists.return_value = True
result = generate_outputs(
result = generate_outputs_task(
scan_id=self.scan_id,
provider_id=self.provider_id,
tenant_id=self.tenant_id,
@@ -407,9 +408,31 @@ class TestGenerateOutputs:
),
):
with caplog.at_level("ERROR"):
generate_outputs(
generate_outputs_task(
scan_id=self.scan_id,
provider_id=self.provider_id,
tenant_id=self.tenant_id,
)
assert "Error deleting output files" in caplog.text
class TestScanCompleteTasks:
@patch("tasks.tasks.create_compliance_requirements_task.apply_async")
@patch("tasks.tasks.perform_scan_summary_task.si")
@patch("tasks.tasks.generate_outputs_task.si")
def test_scan_complete_tasks(
self, mock_outputs_task, mock_scan_summary_task, mock_compliance_tasks
):
_perform_scan_complete_tasks("tenant-id", "scan-id", "provider-id")
mock_compliance_tasks.assert_called_once_with(
kwargs={"tenant_id": "tenant-id", "scan_id": "scan-id"},
)
mock_scan_summary_task.assert_called_once_with(
scan_id="scan-id",
tenant_id="tenant-id",
)
mock_outputs_task.assert_called_once_with(
scan_id="scan-id",
provider_id="provider-id",
tenant_id="tenant-id",
)