mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-05-06 08:47:18 +00:00
fix(api): reaggregate resource inventory and attack surface after muting findings (#10843)
This commit is contained in:
@@ -6,12 +6,15 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
|
||||
### 🚀 Added
|
||||
|
||||
- `/overviews/resource-groups` (resource inventory), `/overviews/categories` and `/overviews/attack-surfaces` now reflect newly-muted findings without waiting for the next scan. The post-mute `reaggregate-all-finding-group-summaries` task now also dispatches `aggregate_scan_resource_group_summaries_task`, `aggregate_scan_category_summaries_task` and `aggregate_attack_surface_task` per latest scan of every `(provider, day)` pair, rebuilding `ScanGroupSummary`, `ScanCategorySummary` and `AttackSurfaceOverview` alongside the tables already covered in #10827 [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
|
||||
- CIS Benchmark PDF report generation for scans, exposing the latest CIS version per provider via `GET /scans/{id}/cis/{name}/` and picking the variant dynamically via `_pick_latest_cis_variant` (no hard-coded provider → version mapping) [(#10650)](https://github.com/prowler-cloud/prowler/pull/10650)
|
||||
- Install zizmor v1.24.1 in API Docker image for GitHub Actions workflow scanning [(#10607)](https://github.com/prowler-cloud/prowler/pull/10607)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
- Allows tenant owners to expel users from their organizations [(#10787)](https://github.com/prowler-cloud/prowler/pull/10787)
|
||||
- `aggregate_findings`, `aggregate_attack_surface`, `aggregate_scan_resource_group_summaries` and `aggregate_scan_category_summaries` now upsert via `bulk_create(update_conflicts=True, ...)` instead of the prior `ignore_conflicts=True` / plain INSERT / `already backfilled` short-circuit. Re-runs triggered by the post-mute reaggregation pipeline no longer trip the `unique_*_per_scan` constraints nor silently drop updates, and are race-safe under concurrent writers (e.g. scan completion overlapping with a fresh mute rule) [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
|
||||
- Rename the scan-category and scan-resource-group summary aggregators from `backfill_*` to `aggregate_*` (`backfill_scan_category_summaries` -> `aggregate_scan_category_summaries`, `backfill_scan_resource_group_summaries` -> `aggregate_scan_resource_group_summaries`; Celery task names `backfill-scan-category-summaries` -> `scan-category-summaries`, `backfill-scan-resource-group-summaries` -> `scan-resource-group-summaries`) and move them to the `overview` queue, matching the sibling per-scan aggregators (`perform_scan_summary_task`, `aggregate_daily_severity_task`, `aggregate_finding_group_summaries_task`, `aggregate_attack_surface_task`). The old names had no dispatchers outside the post-mute reaggregation chain, so no task-registry migration is required [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -14,8 +14,8 @@ from rest_framework import status
|
||||
from rest_framework.test import APIClient
|
||||
from tasks.jobs.backfill import (
|
||||
backfill_resource_scan_summaries,
|
||||
backfill_scan_category_summaries,
|
||||
backfill_scan_resource_group_summaries,
|
||||
aggregate_scan_category_summaries,
|
||||
aggregate_scan_resource_group_summaries,
|
||||
)
|
||||
|
||||
from api.attack_paths import (
|
||||
@@ -1445,8 +1445,8 @@ def latest_scan_finding_with_categories(
|
||||
)
|
||||
finding.add_resources([resource])
|
||||
backfill_resource_scan_summaries(tenant_id, str(scan.id))
|
||||
backfill_scan_category_summaries(tenant_id, str(scan.id))
|
||||
backfill_scan_resource_group_summaries(tenant_id, str(scan.id))
|
||||
aggregate_scan_category_summaries(tenant_id, str(scan.id))
|
||||
aggregate_scan_resource_group_summaries(tenant_id, str(scan.id))
|
||||
return finding
|
||||
|
||||
|
||||
|
||||
@@ -297,12 +297,15 @@ def backfill_daily_severity_summaries(tenant_id: str, days: int = None):
|
||||
}
|
||||
|
||||
|
||||
def backfill_scan_category_summaries(tenant_id: str, scan_id: str):
|
||||
def aggregate_scan_category_summaries(tenant_id: str, scan_id: str):
|
||||
"""
|
||||
Backfill ScanCategorySummary for a completed scan.
|
||||
|
||||
Aggregates category counts from all findings in the scan and creates
|
||||
one ScanCategorySummary row per (category, severity) combination.
|
||||
Idempotent: re-runs replace the scan's existing rows so counts stay in
|
||||
sync with `Finding.muted` updates triggered outside scan completion
|
||||
(e.g. mute rules).
|
||||
|
||||
Args:
|
||||
tenant_id: Target tenant UUID
|
||||
@@ -312,11 +315,6 @@ def backfill_scan_category_summaries(tenant_id: str, scan_id: str):
|
||||
dict: Status indicating whether backfill was performed
|
||||
"""
|
||||
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
|
||||
if ScanCategorySummary.objects.filter(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
).exists():
|
||||
return {"status": "already backfilled"}
|
||||
|
||||
if not Scan.objects.filter(
|
||||
tenant_id=tenant_id,
|
||||
id=scan_id,
|
||||
@@ -337,9 +335,6 @@ def backfill_scan_category_summaries(tenant_id: str, scan_id: str):
|
||||
cache=category_counts,
|
||||
)
|
||||
|
||||
if not category_counts:
|
||||
return {"status": "no categories to backfill"}
|
||||
|
||||
category_summaries = [
|
||||
ScanCategorySummary(
|
||||
tenant_id=tenant_id,
|
||||
@@ -353,20 +348,38 @@ def backfill_scan_category_summaries(tenant_id: str, scan_id: str):
|
||||
for (category, severity), counts in category_counts.items()
|
||||
]
|
||||
|
||||
with rls_transaction(tenant_id):
|
||||
ScanCategorySummary.objects.bulk_create(
|
||||
category_summaries, batch_size=500, ignore_conflicts=True
|
||||
)
|
||||
if category_summaries:
|
||||
with rls_transaction(tenant_id):
|
||||
# Upsert so re-runs (post-mute reaggregation) don't trip
|
||||
# `unique_category_severity_per_scan`; race-safe under concurrent writers.
|
||||
ScanCategorySummary.objects.bulk_create(
|
||||
category_summaries,
|
||||
batch_size=500,
|
||||
update_conflicts=True,
|
||||
unique_fields=["tenant_id", "scan_id", "category", "severity"],
|
||||
update_fields=[
|
||||
"total_findings",
|
||||
"failed_findings",
|
||||
"new_failed_findings",
|
||||
],
|
||||
)
|
||||
|
||||
if not category_counts:
|
||||
return {"status": "no categories to backfill"}
|
||||
|
||||
return {"status": "backfilled", "categories_count": len(category_counts)}
|
||||
|
||||
|
||||
def backfill_scan_resource_group_summaries(tenant_id: str, scan_id: str):
|
||||
def aggregate_scan_resource_group_summaries(tenant_id: str, scan_id: str):
|
||||
"""
|
||||
Backfill ScanGroupSummary for a completed scan.
|
||||
|
||||
Aggregates resource group counts from all findings in the scan and creates
|
||||
one ScanGroupSummary row per (resource_group, severity) combination.
|
||||
Idempotent: re-runs replace the scan's existing rows so counts stay in
|
||||
sync with `Finding.muted` updates triggered outside scan completion
|
||||
(e.g. mute rules) and with resource-inventory views reading from this
|
||||
table.
|
||||
|
||||
Args:
|
||||
tenant_id: Target tenant UUID
|
||||
@@ -376,11 +389,6 @@ def backfill_scan_resource_group_summaries(tenant_id: str, scan_id: str):
|
||||
dict: Status indicating whether backfill was performed
|
||||
"""
|
||||
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
|
||||
if ScanGroupSummary.objects.filter(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
).exists():
|
||||
return {"status": "already backfilled"}
|
||||
|
||||
if not Scan.objects.filter(
|
||||
tenant_id=tenant_id,
|
||||
id=scan_id,
|
||||
@@ -418,9 +426,6 @@ def backfill_scan_resource_group_summaries(tenant_id: str, scan_id: str):
|
||||
group_resources_cache=group_resources_cache,
|
||||
)
|
||||
|
||||
if not resource_group_counts:
|
||||
return {"status": "no resource groups to backfill"}
|
||||
|
||||
# Compute group-level resource counts (same value for all severity rows in a group)
|
||||
group_resource_counts = {
|
||||
grp: len(uids) for grp, uids in group_resources_cache.items()
|
||||
@@ -439,10 +444,25 @@ def backfill_scan_resource_group_summaries(tenant_id: str, scan_id: str):
|
||||
for (grp, severity), counts in resource_group_counts.items()
|
||||
]
|
||||
|
||||
with rls_transaction(tenant_id):
|
||||
ScanGroupSummary.objects.bulk_create(
|
||||
resource_group_summaries, batch_size=500, ignore_conflicts=True
|
||||
)
|
||||
if resource_group_summaries:
|
||||
with rls_transaction(tenant_id):
|
||||
# Upsert so re-runs (post-mute reaggregation) don't trip
|
||||
# `unique_resource_group_severity_per_scan`; race-safe under concurrent writers.
|
||||
ScanGroupSummary.objects.bulk_create(
|
||||
resource_group_summaries,
|
||||
batch_size=500,
|
||||
update_conflicts=True,
|
||||
unique_fields=["tenant_id", "scan_id", "resource_group", "severity"],
|
||||
update_fields=[
|
||||
"total_findings",
|
||||
"failed_findings",
|
||||
"new_failed_findings",
|
||||
"resources_count",
|
||||
],
|
||||
)
|
||||
|
||||
if not resource_group_counts:
|
||||
return {"status": "no resource groups to backfill"}
|
||||
|
||||
return {"status": "backfilled", "resource_groups_count": len(resource_group_counts)}
|
||||
|
||||
|
||||
@@ -1198,10 +1198,36 @@ def aggregate_findings(tenant_id: str, scan_id: str):
|
||||
)
|
||||
for agg in aggregation
|
||||
}
|
||||
# Delete first so re-runs (e.g. post-mute reaggregation) don't hit
|
||||
# the `unique_scan_summary` constraint.
|
||||
ScanSummary.objects.filter(tenant_id=tenant_id, scan_id=scan_id).delete()
|
||||
ScanSummary.objects.bulk_create(scan_aggregations, batch_size=3000)
|
||||
# Upsert so re-runs (post-mute reaggregation) don't trip
|
||||
# `unique_scan_summary`; race-safe under concurrent writers.
|
||||
ScanSummary.objects.bulk_create(
|
||||
scan_aggregations,
|
||||
batch_size=3000,
|
||||
update_conflicts=True,
|
||||
unique_fields=[
|
||||
"tenant",
|
||||
"scan",
|
||||
"check_id",
|
||||
"service",
|
||||
"severity",
|
||||
"region",
|
||||
],
|
||||
update_fields=[
|
||||
"_pass",
|
||||
"fail",
|
||||
"muted",
|
||||
"total",
|
||||
"new",
|
||||
"changed",
|
||||
"unchanged",
|
||||
"fail_new",
|
||||
"fail_changed",
|
||||
"pass_new",
|
||||
"pass_changed",
|
||||
"muted_new",
|
||||
"muted_changed",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def _aggregate_findings_by_region(
|
||||
@@ -1546,13 +1572,24 @@ def aggregate_attack_surface(tenant_id: str, scan_id: str):
|
||||
)
|
||||
)
|
||||
|
||||
# Bulk create overview records
|
||||
if overview_objects:
|
||||
with rls_transaction(tenant_id):
|
||||
AttackSurfaceOverview.objects.bulk_create(overview_objects, batch_size=500)
|
||||
logger.info(
|
||||
f"Created {len(overview_objects)} attack surface overview records for scan {scan_id}"
|
||||
# Upsert so re-runs (post-mute reaggregation) don't trip
|
||||
# `unique_attack_surface_per_scan`; race-safe under concurrent writers.
|
||||
AttackSurfaceOverview.objects.bulk_create(
|
||||
overview_objects,
|
||||
batch_size=500,
|
||||
update_conflicts=True,
|
||||
unique_fields=["tenant_id", "scan_id", "attack_surface_type"],
|
||||
update_fields=[
|
||||
"total_findings",
|
||||
"failed_findings",
|
||||
"muted_failed_findings",
|
||||
],
|
||||
)
|
||||
logger.info(
|
||||
f"Upserted {len(overview_objects)} attack surface overview records for scan {scan_id}"
|
||||
)
|
||||
else:
|
||||
logger.info(f"No attack surface overview records created for scan {scan_id}")
|
||||
|
||||
|
||||
@@ -20,8 +20,8 @@ from tasks.jobs.backfill import (
|
||||
backfill_finding_group_summaries,
|
||||
backfill_provider_compliance_scores,
|
||||
backfill_resource_scan_summaries,
|
||||
backfill_scan_category_summaries,
|
||||
backfill_scan_resource_group_summaries,
|
||||
aggregate_scan_category_summaries,
|
||||
aggregate_scan_resource_group_summaries,
|
||||
)
|
||||
from tasks.jobs.connection import (
|
||||
check_integration_connection,
|
||||
@@ -659,9 +659,9 @@ def backfill_finding_group_summaries_task(tenant_id: str, days: int = None):
|
||||
return backfill_finding_group_summaries(tenant_id=tenant_id, days=days)
|
||||
|
||||
|
||||
@shared_task(name="backfill-scan-category-summaries", queue="backfill")
|
||||
@shared_task(name="scan-category-summaries", queue="overview")
|
||||
@handle_provider_deletion
|
||||
def backfill_scan_category_summaries_task(tenant_id: str, scan_id: str):
|
||||
def aggregate_scan_category_summaries_task(tenant_id: str, scan_id: str):
|
||||
"""
|
||||
Backfill ScanCategorySummary for a completed scan.
|
||||
|
||||
@@ -671,12 +671,12 @@ def backfill_scan_category_summaries_task(tenant_id: str, scan_id: str):
|
||||
tenant_id (str): The tenant identifier.
|
||||
scan_id (str): The scan identifier.
|
||||
"""
|
||||
return backfill_scan_category_summaries(tenant_id=tenant_id, scan_id=scan_id)
|
||||
return aggregate_scan_category_summaries(tenant_id=tenant_id, scan_id=scan_id)
|
||||
|
||||
|
||||
@shared_task(name="backfill-scan-resource-group-summaries", queue="backfill")
|
||||
@shared_task(name="scan-resource-group-summaries", queue="overview")
|
||||
@handle_provider_deletion
|
||||
def backfill_scan_resource_group_summaries_task(tenant_id: str, scan_id: str):
|
||||
def aggregate_scan_resource_group_summaries_task(tenant_id: str, scan_id: str):
|
||||
"""
|
||||
Backfill ScanGroupSummary for a completed scan.
|
||||
|
||||
@@ -686,7 +686,7 @@ def backfill_scan_resource_group_summaries_task(tenant_id: str, scan_id: str):
|
||||
tenant_id (str): The tenant identifier.
|
||||
scan_id (str): The scan identifier.
|
||||
"""
|
||||
return backfill_scan_resource_group_summaries(tenant_id=tenant_id, scan_id=scan_id)
|
||||
return aggregate_scan_resource_group_summaries(tenant_id=tenant_id, scan_id=scan_id)
|
||||
|
||||
|
||||
@shared_task(name="backfill-provider-compliance-scores", queue="backfill")
|
||||
@@ -778,12 +778,16 @@ def reaggregate_all_finding_group_summaries_task(tenant_id: str):
|
||||
limit. To keep the pre-aggregated tables consistent with that update,
|
||||
this task re-runs the same per-scan aggregation pipeline that scan
|
||||
completion runs on the latest completed scan of every (provider, day)
|
||||
pair, rebuilding the three tables that power the read endpoints:
|
||||
pair, rebuilding the tables that power the read endpoints:
|
||||
|
||||
- `ScanSummary` and `DailySeveritySummary` -> `/overviews/findings`,
|
||||
`/overviews/findings-severity`, `/overviews/services`.
|
||||
- `FindingGroupDailySummary` -> `/finding-groups` and
|
||||
`/finding-groups/latest`.
|
||||
- `ScanGroupSummary` -> `/overviews/resource-groups` (resource
|
||||
inventory).
|
||||
- `ScanCategorySummary` -> `/overviews/categories`.
|
||||
- `AttackSurfaceOverview` -> `/overviews/attack-surfaces`.
|
||||
|
||||
Per-scan pipelines are dispatched in parallel via a Celery group so
|
||||
wallclock scales with the worker pool.
|
||||
@@ -815,8 +819,8 @@ def reaggregate_all_finding_group_summaries_task(tenant_id: str):
|
||||
len(scan_ids),
|
||||
)
|
||||
# DailySeveritySummary reads from ScanSummary, so ScanSummary must be
|
||||
# recomputed first; FindingGroupDailySummary reads from Finding
|
||||
# directly and can run in parallel with the severity step.
|
||||
# recomputed first; the other aggregators read Finding directly and
|
||||
# can run in parallel with the severity step.
|
||||
group(
|
||||
chain(
|
||||
perform_scan_summary_task.si(tenant_id=tenant_id, scan_id=scan_id),
|
||||
@@ -827,6 +831,15 @@ def reaggregate_all_finding_group_summaries_task(tenant_id: str):
|
||||
aggregate_finding_group_summaries_task.si(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
),
|
||||
aggregate_scan_resource_group_summaries_task.si(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
),
|
||||
aggregate_scan_category_summaries_task.si(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
),
|
||||
aggregate_attack_surface_task.si(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
),
|
||||
),
|
||||
)
|
||||
for scan_id in scan_ids
|
||||
|
||||
@@ -7,8 +7,8 @@ from tasks.jobs.backfill import (
|
||||
backfill_compliance_summaries,
|
||||
backfill_provider_compliance_scores,
|
||||
backfill_resource_scan_summaries,
|
||||
backfill_scan_category_summaries,
|
||||
backfill_scan_resource_group_summaries,
|
||||
aggregate_scan_category_summaries,
|
||||
aggregate_scan_resource_group_summaries,
|
||||
)
|
||||
|
||||
from api.models import (
|
||||
@@ -183,6 +183,10 @@ class TestBackfillComplianceSummaries:
|
||||
def test_backfill_creates_compliance_summaries(
|
||||
self, tenants_fixture, scans_fixture, compliance_requirements_overviews_fixture
|
||||
):
|
||||
# Fixture seeds compliance rows the backfill aggregates over; pytest
|
||||
# injects it by parameter name, so we reference it explicitly here
|
||||
# to keep static analysers from flagging it as unused.
|
||||
del compliance_requirements_overviews_fixture
|
||||
tenant = tenants_fixture[0]
|
||||
scan = scans_fixture[0]
|
||||
|
||||
@@ -227,22 +231,86 @@ class TestBackfillComplianceSummaries:
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestBackfillScanCategorySummaries:
|
||||
def test_already_backfilled(self, scan_category_summary_fixture):
|
||||
def test_rerun_with_no_findings_is_noop(self, scan_category_summary_fixture):
|
||||
"""When the scan has no findings, the backfill is a no-op: it
|
||||
reports `no categories to backfill` and leaves the table
|
||||
untouched. The upsert path cannot drop rows it does not produce,
|
||||
so any pre-existing row survives (matching the scan-completion
|
||||
writer that used `ignore_conflicts=True`)."""
|
||||
tenant_id = scan_category_summary_fixture.tenant_id
|
||||
scan_id = scan_category_summary_fixture.scan_id
|
||||
|
||||
result = backfill_scan_category_summaries(str(tenant_id), str(scan_id))
|
||||
result = aggregate_scan_category_summaries(str(tenant_id), str(scan_id))
|
||||
|
||||
assert result == {"status": "already backfilled"}
|
||||
assert result == {"status": "no categories to backfill"}
|
||||
assert ScanCategorySummary.objects.filter(
|
||||
tenant_id=tenant_id, scan_id=scan_id, category="existing-category"
|
||||
).exists()
|
||||
|
||||
def test_rerun_upserts_without_duplicating(self, findings_with_categories_fixture):
|
||||
"""Calling the backfill twice upserts rather than raising on
|
||||
`unique_category_severity_per_scan`; rows are updated in place
|
||||
(same primary keys)."""
|
||||
finding = findings_with_categories_fixture
|
||||
tenant_id = str(finding.tenant_id)
|
||||
scan_id = str(finding.scan_id)
|
||||
|
||||
aggregate_scan_category_summaries(tenant_id, scan_id)
|
||||
first_ids = set(
|
||||
ScanCategorySummary.objects.filter(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
).values_list("id", flat=True)
|
||||
)
|
||||
|
||||
aggregate_scan_category_summaries(tenant_id, scan_id)
|
||||
second_ids = set(
|
||||
ScanCategorySummary.objects.filter(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
).values_list("id", flat=True)
|
||||
)
|
||||
|
||||
assert first_ids == second_ids
|
||||
assert len(first_ids) == 2 # 2 categories x 1 severity
|
||||
|
||||
def test_rerun_reflects_mute_between_runs(self, findings_with_categories_fixture):
|
||||
"""Muting a finding between two backfill runs must move counters:
|
||||
`failed_findings` and `new_failed_findings` drop to zero (muted
|
||||
findings are excluded from those totals). Guards against a
|
||||
regression where the upsert keeps stale counts from the first run."""
|
||||
finding = findings_with_categories_fixture
|
||||
tenant_id = str(finding.tenant_id)
|
||||
scan_id = str(finding.scan_id)
|
||||
|
||||
aggregate_scan_category_summaries(tenant_id, scan_id)
|
||||
before = list(
|
||||
ScanCategorySummary.objects.filter(tenant_id=tenant_id, scan_id=scan_id)
|
||||
)
|
||||
assert all(s.failed_findings == 1 for s in before)
|
||||
assert all(s.new_failed_findings == 1 for s in before)
|
||||
assert all(s.total_findings == 1 for s in before)
|
||||
|
||||
Finding.all_objects.filter(pk=finding.pk).update(muted=True)
|
||||
|
||||
aggregate_scan_category_summaries(tenant_id, scan_id)
|
||||
after = list(
|
||||
ScanCategorySummary.objects.filter(tenant_id=tenant_id, scan_id=scan_id)
|
||||
)
|
||||
|
||||
assert {s.id for s in after} == {s.id for s in before}
|
||||
assert all(s.failed_findings == 0 for s in after)
|
||||
assert all(s.new_failed_findings == 0 for s in after)
|
||||
assert all(s.total_findings == 0 for s in after)
|
||||
|
||||
def test_not_completed_scan(self, get_not_completed_scans):
|
||||
for scan in get_not_completed_scans:
|
||||
result = backfill_scan_category_summaries(str(scan.tenant_id), str(scan.id))
|
||||
result = aggregate_scan_category_summaries(
|
||||
str(scan.tenant_id), str(scan.id)
|
||||
)
|
||||
assert result == {"status": "scan is not completed"}
|
||||
|
||||
def test_no_categories_to_backfill(self, scans_fixture):
|
||||
scan = scans_fixture[1] # Failed scan with no findings
|
||||
result = backfill_scan_category_summaries(str(scan.tenant_id), str(scan.id))
|
||||
result = aggregate_scan_category_summaries(str(scan.tenant_id), str(scan.id))
|
||||
assert result == {"status": "no categories to backfill"}
|
||||
|
||||
def test_successful_backfill(self, findings_with_categories_fixture):
|
||||
@@ -250,7 +318,7 @@ class TestBackfillScanCategorySummaries:
|
||||
tenant_id = str(finding.tenant_id)
|
||||
scan_id = str(finding.scan_id)
|
||||
|
||||
result = backfill_scan_category_summaries(tenant_id, scan_id)
|
||||
result = aggregate_scan_category_summaries(tenant_id, scan_id)
|
||||
|
||||
# 2 categories × 1 severity = 2 rows
|
||||
assert result == {"status": "backfilled", "categories_count": 2}
|
||||
@@ -311,24 +379,87 @@ def scan_resource_group_summary_fixture(scans_fixture):
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestBackfillScanGroupSummaries:
|
||||
def test_already_backfilled(self, scan_resource_group_summary_fixture):
|
||||
def test_rerun_with_no_findings_is_noop(self, scan_resource_group_summary_fixture):
|
||||
"""When the scan has no findings, the backfill is a no-op: it
|
||||
reports `no resource groups to backfill` and leaves the table
|
||||
untouched. The upsert path cannot drop rows it does not produce,
|
||||
so any pre-existing row survives (matching the scan-completion
|
||||
writer that used `ignore_conflicts=True`)."""
|
||||
tenant_id = scan_resource_group_summary_fixture.tenant_id
|
||||
scan_id = scan_resource_group_summary_fixture.scan_id
|
||||
|
||||
result = backfill_scan_resource_group_summaries(str(tenant_id), str(scan_id))
|
||||
result = aggregate_scan_resource_group_summaries(str(tenant_id), str(scan_id))
|
||||
|
||||
assert result == {"status": "already backfilled"}
|
||||
assert result == {"status": "no resource groups to backfill"}
|
||||
assert ScanGroupSummary.objects.filter(
|
||||
tenant_id=tenant_id, scan_id=scan_id, resource_group="existing-group"
|
||||
).exists()
|
||||
|
||||
def test_rerun_upserts_without_duplicating(self, findings_with_group_fixture):
|
||||
"""Calling the backfill twice upserts rather than raising on
|
||||
`unique_resource_group_severity_per_scan`; rows are updated in
|
||||
place (same primary keys)."""
|
||||
finding = findings_with_group_fixture
|
||||
tenant_id = str(finding.tenant_id)
|
||||
scan_id = str(finding.scan_id)
|
||||
|
||||
aggregate_scan_resource_group_summaries(tenant_id, scan_id)
|
||||
first_ids = set(
|
||||
ScanGroupSummary.objects.filter(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
).values_list("id", flat=True)
|
||||
)
|
||||
|
||||
aggregate_scan_resource_group_summaries(tenant_id, scan_id)
|
||||
second_ids = set(
|
||||
ScanGroupSummary.objects.filter(
|
||||
tenant_id=tenant_id, scan_id=scan_id
|
||||
).values_list("id", flat=True)
|
||||
)
|
||||
|
||||
assert first_ids == second_ids
|
||||
assert len(first_ids) == 1 # 1 resource group x 1 severity
|
||||
|
||||
def test_rerun_reflects_mute_between_runs(self, findings_with_group_fixture):
|
||||
"""Muting a finding between two backfill runs must move counters:
|
||||
`failed_findings` and `new_failed_findings` drop to zero (muted
|
||||
findings are excluded from those totals). Guards against a
|
||||
regression where the upsert keeps stale counts from the first run."""
|
||||
finding = findings_with_group_fixture
|
||||
tenant_id = str(finding.tenant_id)
|
||||
scan_id = str(finding.scan_id)
|
||||
|
||||
aggregate_scan_resource_group_summaries(tenant_id, scan_id)
|
||||
before = list(
|
||||
ScanGroupSummary.objects.filter(tenant_id=tenant_id, scan_id=scan_id)
|
||||
)
|
||||
assert len(before) == 1
|
||||
assert before[0].failed_findings == 1
|
||||
assert before[0].new_failed_findings == 1
|
||||
assert before[0].total_findings == 1
|
||||
|
||||
Finding.all_objects.filter(pk=finding.pk).update(muted=True)
|
||||
|
||||
aggregate_scan_resource_group_summaries(tenant_id, scan_id)
|
||||
after = list(
|
||||
ScanGroupSummary.objects.filter(tenant_id=tenant_id, scan_id=scan_id)
|
||||
)
|
||||
|
||||
assert {s.id for s in after} == {s.id for s in before}
|
||||
assert after[0].failed_findings == 0
|
||||
assert after[0].new_failed_findings == 0
|
||||
assert after[0].total_findings == 0
|
||||
|
||||
def test_not_completed_scan(self, get_not_completed_scans):
|
||||
for scan in get_not_completed_scans:
|
||||
result = backfill_scan_resource_group_summaries(
|
||||
result = aggregate_scan_resource_group_summaries(
|
||||
str(scan.tenant_id), str(scan.id)
|
||||
)
|
||||
assert result == {"status": "scan is not completed"}
|
||||
|
||||
def test_no_resource_groups_to_backfill(self, scans_fixture):
|
||||
scan = scans_fixture[1] # Failed scan with no findings
|
||||
result = backfill_scan_resource_group_summaries(
|
||||
result = aggregate_scan_resource_group_summaries(
|
||||
str(scan.tenant_id), str(scan.id)
|
||||
)
|
||||
assert result == {"status": "no resource groups to backfill"}
|
||||
@@ -338,7 +469,7 @@ class TestBackfillScanGroupSummaries:
|
||||
tenant_id = str(finding.tenant_id)
|
||||
scan_id = str(finding.scan_id)
|
||||
|
||||
result = backfill_scan_resource_group_summaries(tenant_id, scan_id)
|
||||
result = aggregate_scan_resource_group_summaries(tenant_id, scan_id)
|
||||
|
||||
# 1 resource group × 1 severity = 1 row
|
||||
assert result == {"status": "backfilled", "resource_groups_count": 1}
|
||||
|
||||
@@ -3366,14 +3366,24 @@ class TestAggregateFindings:
|
||||
findings_fixture,
|
||||
):
|
||||
"""Re-running `aggregate_findings` for the same scan must not violate
|
||||
the `unique_scan_summary` constraint, and the resulting row set for
|
||||
the scan must match the single-run output. This is exercised by the
|
||||
post-mute reaggregation pipeline, which re-dispatches
|
||||
`perform_scan_summary_task` against scans whose summaries already
|
||||
exist."""
|
||||
the `unique_scan_summary` constraint. The post-mute reaggregation
|
||||
pipeline re-dispatches `perform_scan_summary_task` against scans
|
||||
whose summaries already exist; upsert must update existing rows in
|
||||
place (same primary keys) rather than inserting duplicates."""
|
||||
tenant = tenants_fixture[0]
|
||||
scan = scans_fixture[0]
|
||||
|
||||
value_columns = (
|
||||
"check_id",
|
||||
"service",
|
||||
"severity",
|
||||
"region",
|
||||
"fail",
|
||||
"_pass",
|
||||
"muted",
|
||||
"total",
|
||||
)
|
||||
|
||||
aggregate_findings(str(tenant.id), str(scan.id))
|
||||
first_run_ids = set(
|
||||
ScanSummary.all_objects.filter(
|
||||
@@ -3382,19 +3392,11 @@ class TestAggregateFindings:
|
||||
)
|
||||
first_run_rows = list(
|
||||
ScanSummary.all_objects.filter(tenant_id=tenant.id, scan_id=scan.id).values(
|
||||
"check_id",
|
||||
"service",
|
||||
"severity",
|
||||
"region",
|
||||
"fail",
|
||||
"_pass",
|
||||
"muted",
|
||||
"total",
|
||||
*value_columns
|
||||
)
|
||||
)
|
||||
|
||||
# Second invocation must not raise and must replace the rows without
|
||||
# leaving duplicates behind.
|
||||
# Second invocation must not raise and must not duplicate rows.
|
||||
aggregate_findings(str(tenant.id), str(scan.id))
|
||||
second_run_ids = set(
|
||||
ScanSummary.all_objects.filter(
|
||||
@@ -3403,19 +3405,49 @@ class TestAggregateFindings:
|
||||
)
|
||||
second_run_rows = list(
|
||||
ScanSummary.all_objects.filter(tenant_id=tenant.id, scan_id=scan.id).values(
|
||||
"check_id",
|
||||
"service",
|
||||
"severity",
|
||||
"region",
|
||||
"fail",
|
||||
"_pass",
|
||||
"muted",
|
||||
"total",
|
||||
*value_columns
|
||||
)
|
||||
)
|
||||
|
||||
# Upsert preserves the original row identities; values stay stable
|
||||
# because the underlying Finding set is unchanged between runs.
|
||||
assert second_run_rows == first_run_rows
|
||||
assert first_run_ids.isdisjoint(second_run_ids)
|
||||
assert first_run_ids == second_run_ids
|
||||
|
||||
def test_aggregate_findings_reflects_mute_between_runs(
|
||||
self,
|
||||
tenants_fixture,
|
||||
scans_fixture,
|
||||
findings_fixture,
|
||||
):
|
||||
"""Re-running `aggregate_findings` after a finding is muted between
|
||||
runs must move counters: the matching ScanSummary row's `fail`
|
||||
decrements and `muted` increments. Guards against a regression where
|
||||
upsert silently keeps stale values from the first run."""
|
||||
tenant = tenants_fixture[0]
|
||||
scan = scans_fixture[0]
|
||||
finding1, _ = findings_fixture # finding1 is FAIL and not muted.
|
||||
|
||||
aggregate_findings(str(tenant.id), str(scan.id))
|
||||
before = ScanSummary.all_objects.get(
|
||||
tenant_id=tenant.id,
|
||||
scan_id=scan.id,
|
||||
check_id=finding1.check_id,
|
||||
service="ec2",
|
||||
severity=finding1.severity,
|
||||
region="us-east-1",
|
||||
)
|
||||
assert before.fail == 1
|
||||
assert before.muted == 0
|
||||
|
||||
Finding.all_objects.filter(pk=finding1.pk).update(muted=True)
|
||||
|
||||
aggregate_findings(str(tenant.id), str(scan.id))
|
||||
after = ScanSummary.all_objects.get(pk=before.pk)
|
||||
|
||||
assert after.fail == 0
|
||||
assert after.muted == 1
|
||||
assert after.total == before.total
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
|
||||
@@ -2361,6 +2361,9 @@ class TestReaggregateAllFindingGroupSummaries:
|
||||
|
||||
@patch("tasks.tasks.chain")
|
||||
@patch("tasks.tasks.group")
|
||||
@patch("tasks.tasks.aggregate_attack_surface_task")
|
||||
@patch("tasks.tasks.aggregate_scan_category_summaries_task")
|
||||
@patch("tasks.tasks.aggregate_scan_resource_group_summaries_task")
|
||||
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
|
||||
@patch("tasks.tasks.aggregate_daily_severity_task")
|
||||
@patch("tasks.tasks.perform_scan_summary_task")
|
||||
@@ -2371,6 +2374,9 @@ class TestReaggregateAllFindingGroupSummaries:
|
||||
mock_scan_summary_task,
|
||||
mock_daily_severity_task,
|
||||
mock_finding_group_task,
|
||||
mock_resource_group_task,
|
||||
mock_category_task,
|
||||
mock_attack_surface_task,
|
||||
mock_group,
|
||||
mock_chain,
|
||||
):
|
||||
@@ -2383,8 +2389,8 @@ class TestReaggregateAllFindingGroupSummaries:
|
||||
yesterday = today - timedelta(days=1)
|
||||
|
||||
mock_outer_group_result = MagicMock()
|
||||
# The first `group()` call wraps the inner (severity, finding-group)
|
||||
# parallel step; subsequent calls wrap the outer per-scan generator.
|
||||
# The first `group()` call wraps the inner parallel step; subsequent
|
||||
# calls wrap the outer per-scan generator.
|
||||
mock_group.side_effect = lambda *args, **kwargs: (
|
||||
list(args[0]) if args and hasattr(args[0], "__iter__") else None,
|
||||
mock_outer_group_result,
|
||||
@@ -2420,6 +2426,9 @@ class TestReaggregateAllFindingGroupSummaries:
|
||||
mock_scan_summary_task,
|
||||
mock_daily_severity_task,
|
||||
mock_finding_group_task,
|
||||
mock_resource_group_task,
|
||||
mock_category_task,
|
||||
mock_attack_surface_task,
|
||||
):
|
||||
assert task_mock.si.call_count == 3
|
||||
dispatched = {
|
||||
@@ -2433,6 +2442,9 @@ class TestReaggregateAllFindingGroupSummaries:
|
||||
|
||||
@patch("tasks.tasks.chain")
|
||||
@patch("tasks.tasks.group")
|
||||
@patch("tasks.tasks.aggregate_attack_surface_task")
|
||||
@patch("tasks.tasks.aggregate_scan_category_summaries_task")
|
||||
@patch("tasks.tasks.aggregate_scan_resource_group_summaries_task")
|
||||
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
|
||||
@patch("tasks.tasks.aggregate_daily_severity_task")
|
||||
@patch("tasks.tasks.perform_scan_summary_task")
|
||||
@@ -2443,6 +2455,9 @@ class TestReaggregateAllFindingGroupSummaries:
|
||||
mock_scan_summary_task,
|
||||
mock_daily_severity_task,
|
||||
mock_finding_group_task,
|
||||
mock_resource_group_task,
|
||||
mock_category_task,
|
||||
mock_attack_surface_task,
|
||||
mock_group,
|
||||
mock_chain,
|
||||
):
|
||||
@@ -2481,6 +2496,9 @@ class TestReaggregateAllFindingGroupSummaries:
|
||||
mock_scan_summary_task,
|
||||
mock_daily_severity_task,
|
||||
mock_finding_group_task,
|
||||
mock_resource_group_task,
|
||||
mock_category_task,
|
||||
mock_attack_surface_task,
|
||||
):
|
||||
task_mock.si.assert_called_once_with(
|
||||
tenant_id=self.tenant_id, scan_id=str(latest_scan_today)
|
||||
|
||||
Reference in New Issue
Block a user