Compare commits

...

3 Commits

Author SHA1 Message Date
Adrián Jesús Peña Rodríguez 0796ae44d6 chore: change hash 2026-04-15 09:54:52 +02:00
Adrián Jesús Peña Rodríguez 7f1591780f chore: update changelog 2026-04-15 09:48:22 +02:00
Adrián Jesús Peña Rodríguez 7824bb5e36 fix(api): self-heal finding-group summaries on /latest drift
- Detect drifted provider/day summaries on /finding-groups/latest
- Enqueue targeted reaggregation task with per-tenant throttling
- Pin APITask RLS writes to primary DB to avoid replica routing
- Backfill latest scan per provider/day instead of per day
2026-04-15 09:43:53 +02:00
8 changed files with 502 additions and 15 deletions
+4
View File
@@ -8,6 +8,10 @@ All notable changes to the **Prowler API** are documented in this file.
- Bump Poetry to `2.3.4` in Dockerfile and pre-commit hooks. Regenerate `api/poetry.lock` [(#10681)](https://github.com/prowler-cloud/prowler/pull/10681)
### 🐞 Fixed
- `/finding-groups/latest` now self-heals drifted `FindingGroupDailySummary` rows by detecting missing provider/day summaries and enqueuing a targeted reaggregation task, with per-tenant throttling to keep request latency unaffected [(#10693)](https://github.com/prowler-cloud/prowler/pull/10693)
### 🔐 Security
- `pytest` from 8.2.2 to 9.0.3 to fix CVE-2025-71176 [(#10678)](https://github.com/prowler-cloud/prowler/pull/10678)
+170
View File
@@ -16782,6 +16782,176 @@ class TestFindingGroupViewSet:
assert attrs["resources_total"] == 3
assert attrs["resources_fail"] == 2
@patch("api.v1.views.reaggregate_finding_group_summaries_for_scans_task.delay")
def test_finding_groups_latest_check_id_self_heals_when_summary_missing(
self,
mock_reaggregate_delay,
authenticated_client,
providers_fixture,
resources_fixture,
):
"""If /latest summary rows are missing, trigger background reaggregation."""
provider = providers_fixture[0]
resource = resources_fixture[0]
check_id = "self_heal_missing_summary_check"
latest_scan = Scan.objects.create(
tenant_id=provider.tenant_id,
provider=provider,
state=StateChoices.COMPLETED,
trigger=Scan.TriggerChoices.MANUAL,
completed_at=datetime.now(timezone.utc),
)
finding = Finding.objects.create(
tenant_id=provider.tenant_id,
uid="self_heal_missing_summary_finding",
scan=latest_scan,
delta="new",
status="FAIL",
severity="critical",
impact="critical",
check_id=check_id,
check_metadata={"CheckId": check_id, "checktitle": "Self heal check"},
first_seen_at=datetime.now(timezone.utc) - timedelta(minutes=5),
muted=False,
)
finding.add_resources([resource])
response = authenticated_client.get(
reverse("finding-group-latest"),
{"filter[check_id]": check_id},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) == 0
mock_reaggregate_delay.assert_called_once_with(
tenant_id=str(provider.tenant_id),
scan_ids=[str(latest_scan.id)],
)
@patch("api.v1.views.reaggregate_finding_group_summaries_for_scans_task.delay")
def test_finding_groups_latest_check_id_no_self_heal_when_summary_matches(
self,
mock_reaggregate_delay,
authenticated_client,
providers_fixture,
resources_fixture,
):
"""When summary and finding-level metrics match, /latest must not trigger reaggregation."""
from tasks.jobs.scan import aggregate_finding_group_summaries
provider = providers_fixture[0]
resource = resources_fixture[0]
check_id = "self_heal_in_sync_summary_check"
latest_scan = Scan.objects.create(
tenant_id=provider.tenant_id,
provider=provider,
state=StateChoices.COMPLETED,
trigger=Scan.TriggerChoices.MANUAL,
completed_at=datetime.now(timezone.utc),
)
finding = Finding.objects.create(
tenant_id=provider.tenant_id,
uid="self_heal_in_sync_summary_finding",
scan=latest_scan,
delta="new",
status="FAIL",
severity="critical",
impact="critical",
check_id=check_id,
check_metadata={"CheckId": check_id, "checktitle": "Self heal check"},
first_seen_at=datetime.now(timezone.utc) - timedelta(minutes=5),
muted=False,
)
finding.add_resources([resource])
aggregate_finding_group_summaries(
tenant_id=str(provider.tenant_id),
scan_id=str(latest_scan.id),
)
response = authenticated_client.get(
reverse("finding-group-latest"),
{"filter[check_id]": check_id},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) == 1
attrs = data[0]["attributes"]
assert attrs["check_id"] == check_id
assert attrs["fail_count"] == 1
assert attrs["resources_fail"] == 1
assert attrs["resources_total"] == 1
mock_reaggregate_delay.assert_not_called()
@patch("api.v1.views.reaggregate_finding_group_summaries_for_scans_task.delay")
def test_finding_groups_latest_no_filter_self_heals_when_summary_missing(
self,
mock_reaggregate_delay,
authenticated_client,
providers_fixture,
resources_fixture,
):
"""Self-healing must also work without any finding-group filters."""
provider = providers_fixture[0]
resource = resources_fixture[0]
check_id = "self_heal_missing_summary_no_filter_check"
latest_scan = Scan.objects.create(
tenant_id=provider.tenant_id,
provider=provider,
state=StateChoices.COMPLETED,
trigger=Scan.TriggerChoices.MANUAL,
completed_at=datetime.now(timezone.utc),
)
finding = Finding.objects.create(
tenant_id=provider.tenant_id,
uid="self_heal_missing_summary_no_filter_finding",
scan=latest_scan,
delta="new",
status="FAIL",
severity="critical",
impact="critical",
check_id=check_id,
check_metadata={"CheckId": check_id, "checktitle": "Self heal no filter"},
first_seen_at=datetime.now(timezone.utc) - timedelta(minutes=2),
muted=False,
)
finding.add_resources([resource])
response = authenticated_client.get(reverse("finding-group-latest"))
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
check_ids = {item["id"] for item in data}
assert check_id not in check_ids
mock_reaggregate_delay.assert_called_once_with(
tenant_id=str(provider.tenant_id),
scan_ids=[str(latest_scan.id)],
)
@patch("api.v1.views.cache.add", return_value=False)
@patch("api.v1.views.FindingGroupViewSet._latest_scan_ids_missing_summary")
def test_finding_groups_latest_self_heal_probe_throttled(
self,
mock_missing_summary_check,
mock_cache_add,
authenticated_client,
finding_groups_fixture,
):
"""When probe cache is warm, /latest should skip drift-detection queries."""
response = authenticated_client.get(reverse("finding-group-latest"))
assert response.status_code == status.HTTP_200_OK
mock_missing_summary_check.assert_not_called()
mock_cache_add.assert_called_once()
def test_finding_groups_latest_provider_type_filter(
self, authenticated_client, finding_groups_fixture
):
+163 -4
View File
@@ -1,5 +1,6 @@
import fnmatch
import glob
import hashlib
import json
import logging
import os
@@ -28,6 +29,7 @@ from dj_rest_auth.registration.views import SocialLoginView
from django.conf import settings as django_settings
from django.contrib.postgres.aggregates import ArrayAgg, BoolAnd, StringAgg
from django.contrib.postgres.search import SearchQuery
from django.core.cache import cache
from django.db import transaction
from django.db.models import (
BooleanField,
@@ -97,6 +99,7 @@ from tasks.tasks import (
mute_historical_findings_task,
perform_scan_task,
reaggregate_all_finding_group_summaries_task,
reaggregate_finding_group_summaries_for_scans_task,
refresh_lighthouse_provider_models_task,
)
@@ -7350,6 +7353,15 @@ class FindingGroupViewSet(BaseRLSViewSet):
"provider.alias": "provider_alias",
}
# Self-heal throttling for /finding-groups/latest:
# - Probe TTL (60s): run drift detection at most once per minute per tenant to
# avoid extra DB checks on every request.
# - Trigger TTL (300s): enqueue at most one reaggregation every 5 minutes per
# tenant *for the same drifted scan set* to prevent task storms while still
# healing stale summaries quickly.
_LATEST_SELF_HEAL_PROBE_CACHE_TTL_SECONDS = 60
_LATEST_SELF_HEAL_TRIGGER_CACHE_TTL_SECONDS = 300
def _validate_sort_fields(self, sort_param, sort_field_map=None):
"""Validate and map JSON:API sort fields using the given field map."""
if sort_field_map is None:
@@ -7671,6 +7683,144 @@ class FindingGroupViewSet(BaseRLSViewSet):
)
return self._aggregate_daily_summaries(clean_queryset)
def _should_self_heal_latest(self, finding_params: QueryDict) -> bool:
"""Run drift detection for /latest summary-path queries."""
if self._requires_finding_level_aggregation(finding_params, latest=True):
return False
return True
def _latest_self_heal_probe_cache_key(self) -> str:
return f"finding_groups:self_heal:probe:tenant:{self.request.tenant_id}"
def _latest_self_heal_trigger_cache_key(self, scan_ids: list[str]) -> str:
scan_key = ",".join(sorted(scan_ids))
digest = hashlib.sha256(scan_key.encode("utf-8")).hexdigest()[:16]
return (
f"finding_groups:self_heal:trigger:tenant:{self.request.tenant_id}:{digest}"
)
def _should_probe_latest_self_heal(self) -> bool:
"""Throttle drift-detection probes to keep /latest lightweight under load."""
return cache.add(
self._latest_self_heal_probe_cache_key(),
"1",
timeout=self._LATEST_SELF_HEAL_PROBE_CACHE_TTL_SECONDS,
)
def _latest_scans_for_visible_providers(self, finding_params: QueryDict):
"""Return latest completed scans (1 per provider), scoped to RBAC and provider filters."""
tenant_id = self.request.tenant_id
role = get_role(self.request.user, tenant_id)
queryset = Scan.objects.filter(
tenant_id=tenant_id, state=StateChoices.COMPLETED
)
if not role.unlimited_visibility:
queryset = queryset.filter(provider_id__in=get_providers(role))
provider_id = finding_params.get("provider_id")
if provider_id:
queryset = queryset.filter(provider_id=provider_id)
provider_ids_in = finding_params.get("provider_id__in")
if provider_ids_in:
provider_ids = [
pid.strip() for pid in provider_ids_in.split(",") if pid.strip()
]
queryset = queryset.filter(provider_id__in=provider_ids)
provider_type = finding_params.get("provider_type")
if provider_type:
queryset = queryset.filter(provider__provider=provider_type)
provider_type_in = finding_params.get("provider_type__in")
if provider_type_in:
provider_types = [
provider.strip()
for provider in provider_type_in.split(",")
if provider.strip()
]
queryset = queryset.filter(provider__provider__in=provider_types)
return list(
queryset.order_by("provider_id", "-completed_at", "-inserted_at")
.distinct("provider_id")
.values("id", "provider_id", "completed_at")
)
def _latest_scan_ids_missing_summary(self, finding_params: QueryDict) -> list[str]:
"""Return latest scan ids whose provider/day has findings but no summary rows."""
latest_scans = self._latest_scans_for_visible_providers(finding_params)
if not latest_scans:
return []
latest_scan_ids = [row["id"] for row in latest_scans]
scans_with_findings = set(
Finding.all_objects.filter(
tenant_id=self.request.tenant_id,
scan_id__in=latest_scan_ids,
)
.values_list("scan_id", flat=True)
.distinct()
)
expected_scan_by_provider_day = {}
for row in latest_scans:
if row["id"] not in scans_with_findings or not row.get("completed_at"):
continue
key = (row["provider_id"], row["completed_at"].date())
expected_scan_by_provider_day[key] = str(row["id"])
if not expected_scan_by_provider_day:
return []
provider_ids = {
provider_id for provider_id, _ in expected_scan_by_provider_day.keys()
}
dates = [date_value for _, date_value in expected_scan_by_provider_day.keys()]
start = datetime.combine(min(dates), datetime.min.time(), tzinfo=timezone.utc)
end = datetime.combine(
max(dates) + timedelta(days=1), datetime.min.time(), tzinfo=timezone.utc
)
summary_rows = FindingGroupDailySummary.objects.filter(
tenant_id=self.request.tenant_id,
provider_id__in=provider_ids,
inserted_at__gte=start,
inserted_at__lt=end,
).values_list("provider_id", "inserted_at")
summary_keys = {
(provider_id, inserted_at.date())
for provider_id, inserted_at in summary_rows
}
missing_keys = [
key
for key in expected_scan_by_provider_day.keys()
if key not in summary_keys
]
return [expected_scan_by_provider_day[key] for key in missing_keys]
def _trigger_latest_self_heal_reaggregation(self, scan_ids: list[str]):
"""Enqueue targeted reaggregation at-most-once-per-window per drifted scan set."""
deduped_scan_ids = sorted(set(scan_ids))
if not deduped_scan_ids:
return
tenant_id = str(self.request.tenant_id)
cache_key = self._latest_self_heal_trigger_cache_key(deduped_scan_ids)
if cache.add(
cache_key, "1", timeout=self._LATEST_SELF_HEAL_TRIGGER_CACHE_TTL_SECONDS
):
reaggregate_finding_group_summaries_for_scans_task.delay(
tenant_id=tenant_id,
scan_ids=deduped_scan_ids,
)
logger.info(
"Triggered finding-group summaries self-heal reaggregation for tenant %s (%d scans)",
tenant_id,
len(deduped_scan_ids),
)
def _sorted_paginated_response(
self,
request,
@@ -7849,11 +7999,20 @@ class FindingGroupViewSet(BaseRLSViewSet):
finding_params, computed_params = self._split_computed_aggregate_filters(
normalized_params
)
aggregated_qs = self._build_aggregated_queryset(finding_params, latest=True)
aggregated_qs = self._apply_aggregated_computed_filters(
aggregated_qs, computed_params
summary_qs = self._build_aggregated_queryset(finding_params, latest=True)
summary_qs = self._apply_aggregated_computed_filters(
summary_qs, computed_params
)
return self._sorted_paginated_response(request, aggregated_qs)
if (
self._should_self_heal_latest(finding_params)
and self._should_probe_latest_self_heal()
):
drifted_scan_ids = self._latest_scan_ids_missing_summary(finding_params)
if drifted_scan_ids:
self._trigger_latest_self_heal_reaggregation(drifted_scan_ids)
return self._sorted_paginated_response(request, summary_qs)
@extend_schema(
summary="List resources for a finding group",
+5 -2
View File
@@ -1,7 +1,6 @@
import warnings
from celery import Celery, Task
from config.env import env
# Suppress specific warnings from django-rest-auth: https://github.com/iMerica/dj-rest-auth/issues/684
@@ -57,7 +56,11 @@ class RLSTask(Task):
from api.db_utils import rls_transaction
tenant_id = kwargs.get("tenant_id")
with rls_transaction(tenant_id):
# Tasks metadata rows are writes and must always run on primary DB.
# If the request thread is currently pinned to a read-replica alias,
# rls_transaction would otherwise set tenant context on that replica
# connection while the write is routed to default, breaking RLS.
with rls_transaction(tenant_id, using="default"):
APITask.objects.update_or_create(
id=task_result_instance.task_id,
tenant_id=tenant_id,
+8 -8
View File
@@ -591,21 +591,21 @@ def backfill_finding_group_summaries(tenant_id: str, days: int = None):
completed_scans = (
Scan.objects.filter(**scan_filter)
.order_by("-completed_at")
.values("id", "completed_at")
.values("id", "provider_id", "completed_at")
)
if not completed_scans:
return {"status": "no scans to backfill"}
# Keep only latest scan per day
latest_scans_by_day = {}
# Keep only latest scan per provider/day
latest_scans_by_provider_day = {}
for scan in completed_scans:
key = scan["completed_at"].date()
if key not in latest_scans_by_day:
latest_scans_by_day[key] = scan
key = (scan["provider_id"], scan["completed_at"].date())
if key not in latest_scans_by_provider_day:
latest_scans_by_provider_day[key] = scan
# Process each day's scan
for scan_date, scan in latest_scans_by_day.items():
# Process each provider/day scan
for _, scan in latest_scans_by_provider_day.items():
scan_id = str(scan["id"])
try:
+25
View File
@@ -766,6 +766,31 @@ def aggregate_finding_group_summaries_task(tenant_id: str, scan_id: str):
return aggregate_finding_group_summaries(tenant_id=tenant_id, scan_id=scan_id)
@shared_task(
base=RLSTask,
name="reaggregate-finding-group-summaries-for-scans",
queue="overview",
)
@set_tenant(keep_tenant=True)
def reaggregate_finding_group_summaries_for_scans_task(
tenant_id: str, scan_ids: list[str]
):
"""Reaggregate finding group summaries for a targeted list of scans."""
deduped_scan_ids = list(dict.fromkeys(scan_id for scan_id in scan_ids if scan_id))
if not deduped_scan_ids:
return {"scans_reaggregated": 0}
logger.info(
"Reaggregating finding group summaries for %d targeted scans",
len(deduped_scan_ids),
)
group(
aggregate_finding_group_summaries_task.si(tenant_id=tenant_id, scan_id=scan_id)
for scan_id in deduped_scan_ids
).apply_async()
return {"scans_reaggregated": len(deduped_scan_ids)}
@shared_task(
base=RLSTask, name="reaggregate-all-finding-group-summaries", queue="overview"
)
+72 -1
View File
@@ -1,10 +1,12 @@
from datetime import datetime, timezone
from contextlib import nullcontext
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
from uuid import uuid4
import pytest
from tasks.jobs.backfill import (
backfill_compliance_summaries,
backfill_finding_group_summaries,
backfill_provider_compliance_scores,
backfill_resource_scan_summaries,
backfill_scan_category_summaries,
@@ -225,6 +227,75 @@ class TestBackfillComplianceSummaries:
assert summary.total_requirements == expected_counts["total_requirements"]
@pytest.mark.django_db
class TestBackfillFindingGroupSummaries:
@patch("tasks.jobs.backfill.aggregate_finding_group_summaries")
@patch("tasks.jobs.backfill.Scan.objects.filter")
@patch("tasks.jobs.backfill.rls_transaction")
def test_keeps_latest_scan_per_provider_per_day(
self,
mock_rls_transaction,
mock_scan_filter,
mock_aggregate_finding_group_summaries,
):
tenant_id = str(uuid4())
provider_1 = uuid4()
provider_2 = uuid4()
now = datetime.now(tz=timezone.utc)
yesterday = now - timedelta(days=1)
latest_p1_today = uuid4()
older_p1_today = uuid4()
latest_p2_today = uuid4()
latest_p1_yesterday = uuid4()
mock_rls_transaction.side_effect = lambda *args, **kwargs: nullcontext()
mock_scan_filter.return_value.order_by.return_value.values.return_value = [
{
"id": latest_p1_today,
"provider_id": provider_1,
"completed_at": now,
},
{
"id": latest_p2_today,
"provider_id": provider_2,
"completed_at": now - timedelta(minutes=2),
},
{
"id": older_p1_today,
"provider_id": provider_1,
"completed_at": now - timedelta(hours=2),
},
{
"id": latest_p1_yesterday,
"provider_id": provider_1,
"completed_at": yesterday,
},
]
mock_aggregate_finding_group_summaries.return_value = {
"status": "completed",
"created": 1,
"updated": 0,
}
result = backfill_finding_group_summaries(tenant_id=tenant_id)
assert result["status"] == "backfilled"
assert result["scans_processed"] == 3
assert result["scans_skipped"] == 0
assert mock_aggregate_finding_group_summaries.call_count == 3
called_scan_ids = {
call.args[1]
for call in mock_aggregate_finding_group_summaries.call_args_list
}
assert called_scan_ids == {
str(latest_p1_today),
str(latest_p2_today),
str(latest_p1_yesterday),
}
@pytest.mark.django_db
class TestBackfillScanCategorySummaries:
def test_already_backfilled(self, scan_category_summary_fixture):
+55
View File
@@ -21,6 +21,7 @@ from tasks.tasks import (
perform_attack_paths_scan_task,
perform_scheduled_scan_task,
reaggregate_all_finding_group_summaries_task,
reaggregate_finding_group_summaries_for_scans_task,
refresh_lighthouse_provider_models_task,
s3_integration_task,
security_hub_integration_task,
@@ -2457,3 +2458,57 @@ class TestReaggregateAllFindingGroupSummaries:
assert result == {"scans_reaggregated": 0}
mock_group.assert_not_called()
@pytest.mark.django_db
class TestReaggregateFindingGroupSummariesForScans:
def setup_method(self):
self.tenant_id = str(uuid.uuid4())
@patch("tasks.tasks.group")
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
def test_dispatches_requested_scan_ids(self, mock_agg_task, mock_group):
scan_id_1 = str(uuid.uuid4())
scan_id_2 = str(uuid.uuid4())
mock_group_result = MagicMock()
mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1]
result = reaggregate_finding_group_summaries_for_scans_task(
tenant_id=self.tenant_id,
scan_ids=[scan_id_1, scan_id_2],
)
assert result == {"scans_reaggregated": 2}
mock_agg_task.si.assert_any_call(tenant_id=self.tenant_id, scan_id=scan_id_1)
mock_agg_task.si.assert_any_call(tenant_id=self.tenant_id, scan_id=scan_id_2)
assert mock_agg_task.si.call_count == 2
mock_group_result.apply_async.assert_called_once()
@patch("tasks.tasks.group")
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
def test_dedupes_scan_ids_before_dispatch(self, mock_agg_task, mock_group):
scan_id_1 = str(uuid.uuid4())
scan_id_2 = str(uuid.uuid4())
mock_group_result = MagicMock()
mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1]
result = reaggregate_finding_group_summaries_for_scans_task(
tenant_id=self.tenant_id,
scan_ids=[scan_id_1, scan_id_2, scan_id_1, ""],
)
assert result == {"scans_reaggregated": 2}
assert mock_agg_task.si.call_count == 2
mock_group_result.apply_async.assert_called_once()
@patch("tasks.tasks.group")
def test_empty_scan_ids_skips_dispatch(self, mock_group):
result = reaggregate_finding_group_summaries_for_scans_task(
tenant_id=self.tenant_id,
scan_ids=[],
)
assert result == {"scans_reaggregated": 0}
mock_group.assert_not_called()