fix(api): reaggregate overview summaries after muting findings (#10827)

This commit is contained in:
Adrián Peña
2026-04-22 10:44:21 +02:00
committed by GitHub
parent 12d475e7af
commit 1456def7d4
5 changed files with 160 additions and 31 deletions
+5
View File
@@ -12,9 +12,14 @@ All notable changes to the **Prowler API** are documented in this file.
## [1.25.3] (Prowler v5.24.3)
### 🚀 Added
- `/overviews/findings`, `/overviews/findings-severity` and `/overviews/services` now reflect newly-muted findings without waiting for the next scan. The post-mute `reaggregate-all-finding-group-summaries` task was extended to re-run the same per-scan pipeline that scan completion runs (`ScanSummary`, `DailySeveritySummary`, `FindingGroupDailySummary`) on the latest scan of every `(provider, day)` pair, keeping the pre-aggregated tables in sync with `Finding.muted` updates [(#10827)](https://github.com/prowler-cloud/prowler/pull/10827)
### 🐞 Fixed
- Finding groups aggregated `status` now treats muted findings as resolved: a group is `FAIL` only while at least one non-muted FAIL remains, otherwise it is `PASS` (including fully-muted groups). The `filter[status]` filter and the `sort=status` ordering share the same semantics, keeping `status` consistent with `fail_count` and the orthogonal `muted` flag [(#10825)](https://github.com/prowler-cloud/prowler/pull/10825)
- `aggregate_findings` is now idempotent: it deletes the scan's existing `ScanSummary` rows before `bulk_create`, so re-runs (such as the post-mute reaggregation pipeline) no longer violate the `unique_scan_summary` constraint and no longer abort the downstream `DailySeveritySummary` / `FindingGroupDailySummary` recomputation for the affected scan [(#10827)](https://github.com/prowler-cloud/prowler/pull/10827)
---
+3
View File
@@ -1198,6 +1198,9 @@ def aggregate_findings(tenant_id: str, scan_id: str):
)
for agg in aggregation
}
# Delete first so re-runs (e.g. post-mute reaggregation) don't hit
# the `unique_scan_summary` constraint.
ScanSummary.objects.filter(tenant_id=tenant_id, scan_id=scan_id).delete()
ScanSummary.objects.bulk_create(scan_aggregations, batch_size=3000)
+27 -9
View File
@@ -771,15 +771,22 @@ def aggregate_finding_group_summaries_task(tenant_id: str, scan_id: str):
)
@set_tenant(keep_tenant=True)
def reaggregate_all_finding_group_summaries_task(tenant_id: str):
"""Reaggregate finding group summaries for every (provider, day) combination.
"""Reaggregate every pre-aggregated summary table for this tenant.
Mirrors the unbounded scope of `mute_historical_findings_task`: that task
rewrites every Finding row whose UID matches a mute rule, with no time
limit. To keep the daily summaries consistent with that update, this task
re-runs the aggregator on the latest completed scan of every (provider,
day) pair that exists in the database. Tasks are dispatched in parallel
via a Celery group so the wallclock scales with the worker pool, not with
the number of pairs.
limit. To keep the pre-aggregated tables consistent with that update,
this task re-runs the same per-scan aggregation pipeline that scan
completion runs on the latest completed scan of every (provider, day)
pair, rebuilding the three tables that power the read endpoints:
- `ScanSummary` and `DailySeveritySummary` -> `/overviews/findings`,
`/overviews/findings-severity`, `/overviews/services`.
- `FindingGroupDailySummary` -> `/finding-groups` and
`/finding-groups/latest`.
Per-scan pipelines are dispatched in parallel via a Celery group so
wallclock scales with the worker pool.
"""
completed_scans = list(
Scan.objects.filter(
@@ -804,12 +811,23 @@ def reaggregate_all_finding_group_summaries_task(tenant_id: str):
scan_ids = list(latest_scans.values())
if scan_ids:
logger.info(
"Reaggregating finding group summaries for %d scans (provider x day)",
"Reaggregating overview/finding summaries for %d scans (provider x day)",
len(scan_ids),
)
# DailySeveritySummary reads from ScanSummary, so ScanSummary must be
# recomputed first; FindingGroupDailySummary reads from Finding
# directly and can run in parallel with the severity step.
group(
aggregate_finding_group_summaries_task.si(
tenant_id=tenant_id, scan_id=scan_id
chain(
perform_scan_summary_task.si(tenant_id=tenant_id, scan_id=scan_id),
group(
aggregate_daily_severity_task.si(
tenant_id=tenant_id, scan_id=scan_id
),
aggregate_finding_group_summaries_task.si(
tenant_id=tenant_id, scan_id=scan_id
),
),
)
for scan_id in scan_ids
).apply_async()
+59
View File
@@ -36,6 +36,7 @@ from api.models import (
Provider,
Resource,
Scan,
ScanSummary,
StateChoices,
StatusChoices,
)
@@ -3358,6 +3359,64 @@ class TestAggregateFindings:
regions = {s.region for s in summaries}
assert regions == {"us-east-1", "us-west-2"}
def test_aggregate_findings_is_idempotent_on_rerun(
self,
tenants_fixture,
scans_fixture,
findings_fixture,
):
"""Re-running `aggregate_findings` for the same scan must not violate
the `unique_scan_summary` constraint, and the resulting row set for
the scan must match the single-run output. This is exercised by the
post-mute reaggregation pipeline, which re-dispatches
`perform_scan_summary_task` against scans whose summaries already
exist."""
tenant = tenants_fixture[0]
scan = scans_fixture[0]
aggregate_findings(str(tenant.id), str(scan.id))
first_run_ids = set(
ScanSummary.all_objects.filter(
tenant_id=tenant.id, scan_id=scan.id
).values_list("id", flat=True)
)
first_run_rows = list(
ScanSummary.all_objects.filter(tenant_id=tenant.id, scan_id=scan.id).values(
"check_id",
"service",
"severity",
"region",
"fail",
"_pass",
"muted",
"total",
)
)
# Second invocation must not raise and must replace the rows without
# leaving duplicates behind.
aggregate_findings(str(tenant.id), str(scan.id))
second_run_ids = set(
ScanSummary.all_objects.filter(
tenant_id=tenant.id, scan_id=scan.id
).values_list("id", flat=True)
)
second_run_rows = list(
ScanSummary.all_objects.filter(tenant_id=tenant.id, scan_id=scan.id).values(
"check_id",
"service",
"severity",
"region",
"fail",
"_pass",
"muted",
"total",
)
)
assert second_run_rows == first_run_rows
assert first_run_ids.isdisjoint(second_run_ids)
@pytest.mark.django_db
class TestAggregateFindingsByRegion:
+66 -22
View File
@@ -2359,11 +2359,20 @@ class TestReaggregateAllFindingGroupSummaries:
def setup_method(self):
self.tenant_id = str(uuid.uuid4())
@patch("tasks.tasks.chain")
@patch("tasks.tasks.group")
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
@patch("tasks.tasks.aggregate_daily_severity_task")
@patch("tasks.tasks.perform_scan_summary_task")
@patch("tasks.tasks.Scan.objects.filter")
def test_dispatches_subtasks_for_each_provider_per_day(
self, mock_scan_filter, mock_agg_task, mock_group
self,
mock_scan_filter,
mock_scan_summary_task,
mock_daily_severity_task,
mock_finding_group_task,
mock_group,
mock_chain,
):
provider_id_1 = uuid.uuid4()
provider_id_2 = uuid.uuid4()
@@ -2373,8 +2382,13 @@ class TestReaggregateAllFindingGroupSummaries:
today = datetime.now(tz=timezone.utc)
yesterday = today - timedelta(days=1)
mock_group_result = MagicMock()
mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1]
mock_outer_group_result = MagicMock()
# The first `group()` call wraps the inner (severity, finding-group)
# parallel step; subsequent calls wrap the outer per-scan generator.
mock_group.side_effect = lambda *args, **kwargs: (
list(args[0]) if args and hasattr(args[0], "__iter__") else None,
mock_outer_group_result,
)[1]
mock_scan_filter.return_value.order_by.return_value.values.return_value = [
{
@@ -2397,23 +2411,40 @@ class TestReaggregateAllFindingGroupSummaries:
result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id)
assert result == {"scans_reaggregated": 3}
assert mock_agg_task.si.call_count == 3
mock_agg_task.si.assert_any_call(
tenant_id=self.tenant_id, scan_id=str(scan_id_today_p1)
)
mock_agg_task.si.assert_any_call(
tenant_id=self.tenant_id, scan_id=str(scan_id_today_p2)
)
mock_agg_task.si.assert_any_call(
tenant_id=self.tenant_id, scan_id=str(scan_id_yesterday_p1)
)
mock_group_result.apply_async.assert_called_once()
expected_scan_ids = {
str(scan_id_today_p1),
str(scan_id_today_p2),
str(scan_id_yesterday_p1),
}
for task_mock in (
mock_scan_summary_task,
mock_daily_severity_task,
mock_finding_group_task,
):
assert task_mock.si.call_count == 3
dispatched = {
call.kwargs["scan_id"] for call in task_mock.si.call_args_list
}
assert dispatched == expected_scan_ids
for call in task_mock.si.call_args_list:
assert call.kwargs["tenant_id"] == self.tenant_id
assert mock_chain.call_count == 3
mock_outer_group_result.apply_async.assert_called_once()
@patch("tasks.tasks.chain")
@patch("tasks.tasks.group")
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
@patch("tasks.tasks.aggregate_daily_severity_task")
@patch("tasks.tasks.perform_scan_summary_task")
@patch("tasks.tasks.Scan.objects.filter")
def test_dedupes_scans_to_latest_per_provider_per_day(
self, mock_scan_filter, mock_agg_task, mock_group
self,
mock_scan_filter,
mock_scan_summary_task,
mock_daily_severity_task,
mock_finding_group_task,
mock_group,
mock_chain,
):
"""When several scans run on the same day for the same provider, only
the latest one is dispatched (matching the daily summary unique key)."""
@@ -2423,8 +2454,11 @@ class TestReaggregateAllFindingGroupSummaries:
today_late = datetime.now(tz=timezone.utc)
today_early = today_late - timedelta(hours=4)
mock_group_result = MagicMock()
mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1]
mock_outer_group_result = MagicMock()
mock_group.side_effect = lambda *args, **kwargs: (
list(args[0]) if args and hasattr(args[0], "__iter__") else None,
mock_outer_group_result,
)[1]
# Returned ordered by `-completed_at`, so the most recent comes first.
mock_scan_filter.return_value.order_by.return_value.values.return_value = [
@@ -2443,17 +2477,27 @@ class TestReaggregateAllFindingGroupSummaries:
result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id)
assert result == {"scans_reaggregated": 1}
mock_agg_task.si.assert_called_once_with(
tenant_id=self.tenant_id, scan_id=str(latest_scan_today)
)
mock_group_result.apply_async.assert_called_once()
for task_mock in (
mock_scan_summary_task,
mock_daily_severity_task,
mock_finding_group_task,
):
task_mock.si.assert_called_once_with(
tenant_id=self.tenant_id, scan_id=str(latest_scan_today)
)
mock_chain.assert_called_once()
mock_outer_group_result.apply_async.assert_called_once()
@patch("tasks.tasks.chain")
@patch("tasks.tasks.group")
@patch("tasks.tasks.Scan.objects.filter")
def test_no_completed_scans_skips_dispatch(self, mock_scan_filter, mock_group):
def test_no_completed_scans_skips_dispatch(
self, mock_scan_filter, mock_group, mock_chain
):
mock_scan_filter.return_value.order_by.return_value.values.return_value = []
result = reaggregate_all_finding_group_summaries_task(tenant_id=self.tenant_id)
assert result == {"scans_reaggregated": 0}
mock_group.assert_not_called()
mock_chain.assert_not_called()