mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-05-06 08:47:18 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0796ae44d6 | |||
| 7f1591780f | |||
| 7824bb5e36 |
@@ -8,6 +8,10 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
|
||||
- Bump Poetry to `2.3.4` in Dockerfile and pre-commit hooks. Regenerate `api/poetry.lock` [(#10681)](https://github.com/prowler-cloud/prowler/pull/10681)
|
||||
|
||||
### 🐞 Fixed
|
||||
|
||||
- `/finding-groups/latest` now self-heals drifted `FindingGroupDailySummary` rows by detecting missing provider/day summaries and enqueuing a targeted reaggregation task, with per-tenant throttling to keep request latency unaffected [(#10693)](https://github.com/prowler-cloud/prowler/pull/10693)
|
||||
|
||||
### 🔐 Security
|
||||
|
||||
- `pytest` from 8.2.2 to 9.0.3 to fix CVE-2025-71176 [(#10678)](https://github.com/prowler-cloud/prowler/pull/10678)
|
||||
|
||||
@@ -16782,6 +16782,176 @@ class TestFindingGroupViewSet:
|
||||
assert attrs["resources_total"] == 3
|
||||
assert attrs["resources_fail"] == 2
|
||||
|
||||
@patch("api.v1.views.reaggregate_finding_group_summaries_for_scans_task.delay")
|
||||
def test_finding_groups_latest_check_id_self_heals_when_summary_missing(
|
||||
self,
|
||||
mock_reaggregate_delay,
|
||||
authenticated_client,
|
||||
providers_fixture,
|
||||
resources_fixture,
|
||||
):
|
||||
"""If /latest summary rows are missing, trigger background reaggregation."""
|
||||
provider = providers_fixture[0]
|
||||
resource = resources_fixture[0]
|
||||
check_id = "self_heal_missing_summary_check"
|
||||
|
||||
latest_scan = Scan.objects.create(
|
||||
tenant_id=provider.tenant_id,
|
||||
provider=provider,
|
||||
state=StateChoices.COMPLETED,
|
||||
trigger=Scan.TriggerChoices.MANUAL,
|
||||
completed_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
finding = Finding.objects.create(
|
||||
tenant_id=provider.tenant_id,
|
||||
uid="self_heal_missing_summary_finding",
|
||||
scan=latest_scan,
|
||||
delta="new",
|
||||
status="FAIL",
|
||||
severity="critical",
|
||||
impact="critical",
|
||||
check_id=check_id,
|
||||
check_metadata={"CheckId": check_id, "checktitle": "Self heal check"},
|
||||
first_seen_at=datetime.now(timezone.utc) - timedelta(minutes=5),
|
||||
muted=False,
|
||||
)
|
||||
finding.add_resources([resource])
|
||||
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-group-latest"),
|
||||
{"filter[check_id]": check_id},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
data = response.json()["data"]
|
||||
assert len(data) == 0
|
||||
mock_reaggregate_delay.assert_called_once_with(
|
||||
tenant_id=str(provider.tenant_id),
|
||||
scan_ids=[str(latest_scan.id)],
|
||||
)
|
||||
|
||||
@patch("api.v1.views.reaggregate_finding_group_summaries_for_scans_task.delay")
|
||||
def test_finding_groups_latest_check_id_no_self_heal_when_summary_matches(
|
||||
self,
|
||||
mock_reaggregate_delay,
|
||||
authenticated_client,
|
||||
providers_fixture,
|
||||
resources_fixture,
|
||||
):
|
||||
"""When summary and finding-level metrics match, /latest must not trigger reaggregation."""
|
||||
from tasks.jobs.scan import aggregate_finding_group_summaries
|
||||
|
||||
provider = providers_fixture[0]
|
||||
resource = resources_fixture[0]
|
||||
check_id = "self_heal_in_sync_summary_check"
|
||||
|
||||
latest_scan = Scan.objects.create(
|
||||
tenant_id=provider.tenant_id,
|
||||
provider=provider,
|
||||
state=StateChoices.COMPLETED,
|
||||
trigger=Scan.TriggerChoices.MANUAL,
|
||||
completed_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
finding = Finding.objects.create(
|
||||
tenant_id=provider.tenant_id,
|
||||
uid="self_heal_in_sync_summary_finding",
|
||||
scan=latest_scan,
|
||||
delta="new",
|
||||
status="FAIL",
|
||||
severity="critical",
|
||||
impact="critical",
|
||||
check_id=check_id,
|
||||
check_metadata={"CheckId": check_id, "checktitle": "Self heal check"},
|
||||
first_seen_at=datetime.now(timezone.utc) - timedelta(minutes=5),
|
||||
muted=False,
|
||||
)
|
||||
finding.add_resources([resource])
|
||||
|
||||
aggregate_finding_group_summaries(
|
||||
tenant_id=str(provider.tenant_id),
|
||||
scan_id=str(latest_scan.id),
|
||||
)
|
||||
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-group-latest"),
|
||||
{"filter[check_id]": check_id},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
data = response.json()["data"]
|
||||
assert len(data) == 1
|
||||
attrs = data[0]["attributes"]
|
||||
assert attrs["check_id"] == check_id
|
||||
assert attrs["fail_count"] == 1
|
||||
assert attrs["resources_fail"] == 1
|
||||
assert attrs["resources_total"] == 1
|
||||
mock_reaggregate_delay.assert_not_called()
|
||||
|
||||
@patch("api.v1.views.reaggregate_finding_group_summaries_for_scans_task.delay")
|
||||
def test_finding_groups_latest_no_filter_self_heals_when_summary_missing(
|
||||
self,
|
||||
mock_reaggregate_delay,
|
||||
authenticated_client,
|
||||
providers_fixture,
|
||||
resources_fixture,
|
||||
):
|
||||
"""Self-healing must also work without any finding-group filters."""
|
||||
provider = providers_fixture[0]
|
||||
resource = resources_fixture[0]
|
||||
check_id = "self_heal_missing_summary_no_filter_check"
|
||||
|
||||
latest_scan = Scan.objects.create(
|
||||
tenant_id=provider.tenant_id,
|
||||
provider=provider,
|
||||
state=StateChoices.COMPLETED,
|
||||
trigger=Scan.TriggerChoices.MANUAL,
|
||||
completed_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
finding = Finding.objects.create(
|
||||
tenant_id=provider.tenant_id,
|
||||
uid="self_heal_missing_summary_no_filter_finding",
|
||||
scan=latest_scan,
|
||||
delta="new",
|
||||
status="FAIL",
|
||||
severity="critical",
|
||||
impact="critical",
|
||||
check_id=check_id,
|
||||
check_metadata={"CheckId": check_id, "checktitle": "Self heal no filter"},
|
||||
first_seen_at=datetime.now(timezone.utc) - timedelta(minutes=2),
|
||||
muted=False,
|
||||
)
|
||||
finding.add_resources([resource])
|
||||
|
||||
response = authenticated_client.get(reverse("finding-group-latest"))
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
data = response.json()["data"]
|
||||
check_ids = {item["id"] for item in data}
|
||||
assert check_id not in check_ids
|
||||
mock_reaggregate_delay.assert_called_once_with(
|
||||
tenant_id=str(provider.tenant_id),
|
||||
scan_ids=[str(latest_scan.id)],
|
||||
)
|
||||
|
||||
@patch("api.v1.views.cache.add", return_value=False)
|
||||
@patch("api.v1.views.FindingGroupViewSet._latest_scan_ids_missing_summary")
|
||||
def test_finding_groups_latest_self_heal_probe_throttled(
|
||||
self,
|
||||
mock_missing_summary_check,
|
||||
mock_cache_add,
|
||||
authenticated_client,
|
||||
finding_groups_fixture,
|
||||
):
|
||||
"""When probe cache is warm, /latest should skip drift-detection queries."""
|
||||
response = authenticated_client.get(reverse("finding-group-latest"))
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
mock_missing_summary_check.assert_not_called()
|
||||
mock_cache_add.assert_called_once()
|
||||
|
||||
def test_finding_groups_latest_provider_type_filter(
|
||||
self, authenticated_client, finding_groups_fixture
|
||||
):
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import fnmatch
|
||||
import glob
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -28,6 +29,7 @@ from dj_rest_auth.registration.views import SocialLoginView
|
||||
from django.conf import settings as django_settings
|
||||
from django.contrib.postgres.aggregates import ArrayAgg, BoolAnd, StringAgg
|
||||
from django.contrib.postgres.search import SearchQuery
|
||||
from django.core.cache import cache
|
||||
from django.db import transaction
|
||||
from django.db.models import (
|
||||
BooleanField,
|
||||
@@ -97,6 +99,7 @@ from tasks.tasks import (
|
||||
mute_historical_findings_task,
|
||||
perform_scan_task,
|
||||
reaggregate_all_finding_group_summaries_task,
|
||||
reaggregate_finding_group_summaries_for_scans_task,
|
||||
refresh_lighthouse_provider_models_task,
|
||||
)
|
||||
|
||||
@@ -7350,6 +7353,15 @@ class FindingGroupViewSet(BaseRLSViewSet):
|
||||
"provider.alias": "provider_alias",
|
||||
}
|
||||
|
||||
# Self-heal throttling for /finding-groups/latest:
|
||||
# - Probe TTL (60s): run drift detection at most once per minute per tenant to
|
||||
# avoid extra DB checks on every request.
|
||||
# - Trigger TTL (300s): enqueue at most one reaggregation every 5 minutes per
|
||||
# tenant *for the same drifted scan set* to prevent task storms while still
|
||||
# healing stale summaries quickly.
|
||||
_LATEST_SELF_HEAL_PROBE_CACHE_TTL_SECONDS = 60
|
||||
_LATEST_SELF_HEAL_TRIGGER_CACHE_TTL_SECONDS = 300
|
||||
|
||||
def _validate_sort_fields(self, sort_param, sort_field_map=None):
|
||||
"""Validate and map JSON:API sort fields using the given field map."""
|
||||
if sort_field_map is None:
|
||||
@@ -7671,6 +7683,144 @@ class FindingGroupViewSet(BaseRLSViewSet):
|
||||
)
|
||||
return self._aggregate_daily_summaries(clean_queryset)
|
||||
|
||||
def _should_self_heal_latest(self, finding_params: QueryDict) -> bool:
|
||||
"""Run drift detection for /latest summary-path queries."""
|
||||
if self._requires_finding_level_aggregation(finding_params, latest=True):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _latest_self_heal_probe_cache_key(self) -> str:
|
||||
return f"finding_groups:self_heal:probe:tenant:{self.request.tenant_id}"
|
||||
|
||||
def _latest_self_heal_trigger_cache_key(self, scan_ids: list[str]) -> str:
|
||||
scan_key = ",".join(sorted(scan_ids))
|
||||
digest = hashlib.sha256(scan_key.encode("utf-8")).hexdigest()[:16]
|
||||
return (
|
||||
f"finding_groups:self_heal:trigger:tenant:{self.request.tenant_id}:{digest}"
|
||||
)
|
||||
|
||||
def _should_probe_latest_self_heal(self) -> bool:
|
||||
"""Throttle drift-detection probes to keep /latest lightweight under load."""
|
||||
return cache.add(
|
||||
self._latest_self_heal_probe_cache_key(),
|
||||
"1",
|
||||
timeout=self._LATEST_SELF_HEAL_PROBE_CACHE_TTL_SECONDS,
|
||||
)
|
||||
|
||||
def _latest_scans_for_visible_providers(self, finding_params: QueryDict):
|
||||
"""Return latest completed scans (1 per provider), scoped to RBAC and provider filters."""
|
||||
tenant_id = self.request.tenant_id
|
||||
role = get_role(self.request.user, tenant_id)
|
||||
queryset = Scan.objects.filter(
|
||||
tenant_id=tenant_id, state=StateChoices.COMPLETED
|
||||
)
|
||||
|
||||
if not role.unlimited_visibility:
|
||||
queryset = queryset.filter(provider_id__in=get_providers(role))
|
||||
|
||||
provider_id = finding_params.get("provider_id")
|
||||
if provider_id:
|
||||
queryset = queryset.filter(provider_id=provider_id)
|
||||
|
||||
provider_ids_in = finding_params.get("provider_id__in")
|
||||
if provider_ids_in:
|
||||
provider_ids = [
|
||||
pid.strip() for pid in provider_ids_in.split(",") if pid.strip()
|
||||
]
|
||||
queryset = queryset.filter(provider_id__in=provider_ids)
|
||||
|
||||
provider_type = finding_params.get("provider_type")
|
||||
if provider_type:
|
||||
queryset = queryset.filter(provider__provider=provider_type)
|
||||
|
||||
provider_type_in = finding_params.get("provider_type__in")
|
||||
if provider_type_in:
|
||||
provider_types = [
|
||||
provider.strip()
|
||||
for provider in provider_type_in.split(",")
|
||||
if provider.strip()
|
||||
]
|
||||
queryset = queryset.filter(provider__provider__in=provider_types)
|
||||
|
||||
return list(
|
||||
queryset.order_by("provider_id", "-completed_at", "-inserted_at")
|
||||
.distinct("provider_id")
|
||||
.values("id", "provider_id", "completed_at")
|
||||
)
|
||||
|
||||
def _latest_scan_ids_missing_summary(self, finding_params: QueryDict) -> list[str]:
|
||||
"""Return latest scan ids whose provider/day has findings but no summary rows."""
|
||||
latest_scans = self._latest_scans_for_visible_providers(finding_params)
|
||||
if not latest_scans:
|
||||
return []
|
||||
|
||||
latest_scan_ids = [row["id"] for row in latest_scans]
|
||||
scans_with_findings = set(
|
||||
Finding.all_objects.filter(
|
||||
tenant_id=self.request.tenant_id,
|
||||
scan_id__in=latest_scan_ids,
|
||||
)
|
||||
.values_list("scan_id", flat=True)
|
||||
.distinct()
|
||||
)
|
||||
|
||||
expected_scan_by_provider_day = {}
|
||||
for row in latest_scans:
|
||||
if row["id"] not in scans_with_findings or not row.get("completed_at"):
|
||||
continue
|
||||
key = (row["provider_id"], row["completed_at"].date())
|
||||
expected_scan_by_provider_day[key] = str(row["id"])
|
||||
|
||||
if not expected_scan_by_provider_day:
|
||||
return []
|
||||
|
||||
provider_ids = {
|
||||
provider_id for provider_id, _ in expected_scan_by_provider_day.keys()
|
||||
}
|
||||
dates = [date_value for _, date_value in expected_scan_by_provider_day.keys()]
|
||||
start = datetime.combine(min(dates), datetime.min.time(), tzinfo=timezone.utc)
|
||||
end = datetime.combine(
|
||||
max(dates) + timedelta(days=1), datetime.min.time(), tzinfo=timezone.utc
|
||||
)
|
||||
|
||||
summary_rows = FindingGroupDailySummary.objects.filter(
|
||||
tenant_id=self.request.tenant_id,
|
||||
provider_id__in=provider_ids,
|
||||
inserted_at__gte=start,
|
||||
inserted_at__lt=end,
|
||||
).values_list("provider_id", "inserted_at")
|
||||
|
||||
summary_keys = {
|
||||
(provider_id, inserted_at.date())
|
||||
for provider_id, inserted_at in summary_rows
|
||||
}
|
||||
missing_keys = [
|
||||
key
|
||||
for key in expected_scan_by_provider_day.keys()
|
||||
if key not in summary_keys
|
||||
]
|
||||
return [expected_scan_by_provider_day[key] for key in missing_keys]
|
||||
|
||||
def _trigger_latest_self_heal_reaggregation(self, scan_ids: list[str]):
|
||||
"""Enqueue targeted reaggregation at-most-once-per-window per drifted scan set."""
|
||||
deduped_scan_ids = sorted(set(scan_ids))
|
||||
if not deduped_scan_ids:
|
||||
return
|
||||
tenant_id = str(self.request.tenant_id)
|
||||
cache_key = self._latest_self_heal_trigger_cache_key(deduped_scan_ids)
|
||||
if cache.add(
|
||||
cache_key, "1", timeout=self._LATEST_SELF_HEAL_TRIGGER_CACHE_TTL_SECONDS
|
||||
):
|
||||
reaggregate_finding_group_summaries_for_scans_task.delay(
|
||||
tenant_id=tenant_id,
|
||||
scan_ids=deduped_scan_ids,
|
||||
)
|
||||
logger.info(
|
||||
"Triggered finding-group summaries self-heal reaggregation for tenant %s (%d scans)",
|
||||
tenant_id,
|
||||
len(deduped_scan_ids),
|
||||
)
|
||||
|
||||
def _sorted_paginated_response(
|
||||
self,
|
||||
request,
|
||||
@@ -7849,11 +7999,20 @@ class FindingGroupViewSet(BaseRLSViewSet):
|
||||
finding_params, computed_params = self._split_computed_aggregate_filters(
|
||||
normalized_params
|
||||
)
|
||||
aggregated_qs = self._build_aggregated_queryset(finding_params, latest=True)
|
||||
aggregated_qs = self._apply_aggregated_computed_filters(
|
||||
aggregated_qs, computed_params
|
||||
summary_qs = self._build_aggregated_queryset(finding_params, latest=True)
|
||||
summary_qs = self._apply_aggregated_computed_filters(
|
||||
summary_qs, computed_params
|
||||
)
|
||||
return self._sorted_paginated_response(request, aggregated_qs)
|
||||
|
||||
if (
|
||||
self._should_self_heal_latest(finding_params)
|
||||
and self._should_probe_latest_self_heal()
|
||||
):
|
||||
drifted_scan_ids = self._latest_scan_ids_missing_summary(finding_params)
|
||||
if drifted_scan_ids:
|
||||
self._trigger_latest_self_heal_reaggregation(drifted_scan_ids)
|
||||
|
||||
return self._sorted_paginated_response(request, summary_qs)
|
||||
|
||||
@extend_schema(
|
||||
summary="List resources for a finding group",
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import warnings
|
||||
|
||||
from celery import Celery, Task
|
||||
|
||||
from config.env import env
|
||||
|
||||
# Suppress specific warnings from django-rest-auth: https://github.com/iMerica/dj-rest-auth/issues/684
|
||||
@@ -57,7 +56,11 @@ class RLSTask(Task):
|
||||
from api.db_utils import rls_transaction
|
||||
|
||||
tenant_id = kwargs.get("tenant_id")
|
||||
with rls_transaction(tenant_id):
|
||||
# Tasks metadata rows are writes and must always run on primary DB.
|
||||
# If the request thread is currently pinned to a read-replica alias,
|
||||
# rls_transaction would otherwise set tenant context on that replica
|
||||
# connection while the write is routed to default, breaking RLS.
|
||||
with rls_transaction(tenant_id, using="default"):
|
||||
APITask.objects.update_or_create(
|
||||
id=task_result_instance.task_id,
|
||||
tenant_id=tenant_id,
|
||||
|
||||
@@ -591,21 +591,21 @@ def backfill_finding_group_summaries(tenant_id: str, days: int = None):
|
||||
completed_scans = (
|
||||
Scan.objects.filter(**scan_filter)
|
||||
.order_by("-completed_at")
|
||||
.values("id", "completed_at")
|
||||
.values("id", "provider_id", "completed_at")
|
||||
)
|
||||
|
||||
if not completed_scans:
|
||||
return {"status": "no scans to backfill"}
|
||||
|
||||
# Keep only latest scan per day
|
||||
latest_scans_by_day = {}
|
||||
# Keep only latest scan per provider/day
|
||||
latest_scans_by_provider_day = {}
|
||||
for scan in completed_scans:
|
||||
key = scan["completed_at"].date()
|
||||
if key not in latest_scans_by_day:
|
||||
latest_scans_by_day[key] = scan
|
||||
key = (scan["provider_id"], scan["completed_at"].date())
|
||||
if key not in latest_scans_by_provider_day:
|
||||
latest_scans_by_provider_day[key] = scan
|
||||
|
||||
# Process each day's scan
|
||||
for scan_date, scan in latest_scans_by_day.items():
|
||||
# Process each provider/day scan
|
||||
for _, scan in latest_scans_by_provider_day.items():
|
||||
scan_id = str(scan["id"])
|
||||
|
||||
try:
|
||||
|
||||
@@ -766,6 +766,31 @@ def aggregate_finding_group_summaries_task(tenant_id: str, scan_id: str):
|
||||
return aggregate_finding_group_summaries(tenant_id=tenant_id, scan_id=scan_id)
|
||||
|
||||
|
||||
@shared_task(
|
||||
base=RLSTask,
|
||||
name="reaggregate-finding-group-summaries-for-scans",
|
||||
queue="overview",
|
||||
)
|
||||
@set_tenant(keep_tenant=True)
|
||||
def reaggregate_finding_group_summaries_for_scans_task(
|
||||
tenant_id: str, scan_ids: list[str]
|
||||
):
|
||||
"""Reaggregate finding group summaries for a targeted list of scans."""
|
||||
deduped_scan_ids = list(dict.fromkeys(scan_id for scan_id in scan_ids if scan_id))
|
||||
if not deduped_scan_ids:
|
||||
return {"scans_reaggregated": 0}
|
||||
|
||||
logger.info(
|
||||
"Reaggregating finding group summaries for %d targeted scans",
|
||||
len(deduped_scan_ids),
|
||||
)
|
||||
group(
|
||||
aggregate_finding_group_summaries_task.si(tenant_id=tenant_id, scan_id=scan_id)
|
||||
for scan_id in deduped_scan_ids
|
||||
).apply_async()
|
||||
return {"scans_reaggregated": len(deduped_scan_ids)}
|
||||
|
||||
|
||||
@shared_task(
|
||||
base=RLSTask, name="reaggregate-all-finding-group-summaries", queue="overview"
|
||||
)
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
from datetime import datetime, timezone
|
||||
from contextlib import nullcontext
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
from tasks.jobs.backfill import (
|
||||
backfill_compliance_summaries,
|
||||
backfill_finding_group_summaries,
|
||||
backfill_provider_compliance_scores,
|
||||
backfill_resource_scan_summaries,
|
||||
backfill_scan_category_summaries,
|
||||
@@ -225,6 +227,75 @@ class TestBackfillComplianceSummaries:
|
||||
assert summary.total_requirements == expected_counts["total_requirements"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestBackfillFindingGroupSummaries:
|
||||
@patch("tasks.jobs.backfill.aggregate_finding_group_summaries")
|
||||
@patch("tasks.jobs.backfill.Scan.objects.filter")
|
||||
@patch("tasks.jobs.backfill.rls_transaction")
|
||||
def test_keeps_latest_scan_per_provider_per_day(
|
||||
self,
|
||||
mock_rls_transaction,
|
||||
mock_scan_filter,
|
||||
mock_aggregate_finding_group_summaries,
|
||||
):
|
||||
tenant_id = str(uuid4())
|
||||
provider_1 = uuid4()
|
||||
provider_2 = uuid4()
|
||||
|
||||
now = datetime.now(tz=timezone.utc)
|
||||
yesterday = now - timedelta(days=1)
|
||||
latest_p1_today = uuid4()
|
||||
older_p1_today = uuid4()
|
||||
latest_p2_today = uuid4()
|
||||
latest_p1_yesterday = uuid4()
|
||||
|
||||
mock_rls_transaction.side_effect = lambda *args, **kwargs: nullcontext()
|
||||
mock_scan_filter.return_value.order_by.return_value.values.return_value = [
|
||||
{
|
||||
"id": latest_p1_today,
|
||||
"provider_id": provider_1,
|
||||
"completed_at": now,
|
||||
},
|
||||
{
|
||||
"id": latest_p2_today,
|
||||
"provider_id": provider_2,
|
||||
"completed_at": now - timedelta(minutes=2),
|
||||
},
|
||||
{
|
||||
"id": older_p1_today,
|
||||
"provider_id": provider_1,
|
||||
"completed_at": now - timedelta(hours=2),
|
||||
},
|
||||
{
|
||||
"id": latest_p1_yesterday,
|
||||
"provider_id": provider_1,
|
||||
"completed_at": yesterday,
|
||||
},
|
||||
]
|
||||
mock_aggregate_finding_group_summaries.return_value = {
|
||||
"status": "completed",
|
||||
"created": 1,
|
||||
"updated": 0,
|
||||
}
|
||||
|
||||
result = backfill_finding_group_summaries(tenant_id=tenant_id)
|
||||
|
||||
assert result["status"] == "backfilled"
|
||||
assert result["scans_processed"] == 3
|
||||
assert result["scans_skipped"] == 0
|
||||
assert mock_aggregate_finding_group_summaries.call_count == 3
|
||||
|
||||
called_scan_ids = {
|
||||
call.args[1]
|
||||
for call in mock_aggregate_finding_group_summaries.call_args_list
|
||||
}
|
||||
assert called_scan_ids == {
|
||||
str(latest_p1_today),
|
||||
str(latest_p2_today),
|
||||
str(latest_p1_yesterday),
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestBackfillScanCategorySummaries:
|
||||
def test_already_backfilled(self, scan_category_summary_fixture):
|
||||
|
||||
@@ -21,6 +21,7 @@ from tasks.tasks import (
|
||||
perform_attack_paths_scan_task,
|
||||
perform_scheduled_scan_task,
|
||||
reaggregate_all_finding_group_summaries_task,
|
||||
reaggregate_finding_group_summaries_for_scans_task,
|
||||
refresh_lighthouse_provider_models_task,
|
||||
s3_integration_task,
|
||||
security_hub_integration_task,
|
||||
@@ -2457,3 +2458,57 @@ class TestReaggregateAllFindingGroupSummaries:
|
||||
|
||||
assert result == {"scans_reaggregated": 0}
|
||||
mock_group.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestReaggregateFindingGroupSummariesForScans:
|
||||
def setup_method(self):
|
||||
self.tenant_id = str(uuid.uuid4())
|
||||
|
||||
@patch("tasks.tasks.group")
|
||||
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
|
||||
def test_dispatches_requested_scan_ids(self, mock_agg_task, mock_group):
|
||||
scan_id_1 = str(uuid.uuid4())
|
||||
scan_id_2 = str(uuid.uuid4())
|
||||
|
||||
mock_group_result = MagicMock()
|
||||
mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1]
|
||||
|
||||
result = reaggregate_finding_group_summaries_for_scans_task(
|
||||
tenant_id=self.tenant_id,
|
||||
scan_ids=[scan_id_1, scan_id_2],
|
||||
)
|
||||
|
||||
assert result == {"scans_reaggregated": 2}
|
||||
mock_agg_task.si.assert_any_call(tenant_id=self.tenant_id, scan_id=scan_id_1)
|
||||
mock_agg_task.si.assert_any_call(tenant_id=self.tenant_id, scan_id=scan_id_2)
|
||||
assert mock_agg_task.si.call_count == 2
|
||||
mock_group_result.apply_async.assert_called_once()
|
||||
|
||||
@patch("tasks.tasks.group")
|
||||
@patch("tasks.tasks.aggregate_finding_group_summaries_task")
|
||||
def test_dedupes_scan_ids_before_dispatch(self, mock_agg_task, mock_group):
|
||||
scan_id_1 = str(uuid.uuid4())
|
||||
scan_id_2 = str(uuid.uuid4())
|
||||
|
||||
mock_group_result = MagicMock()
|
||||
mock_group.side_effect = lambda gen: (list(gen), mock_group_result)[1]
|
||||
|
||||
result = reaggregate_finding_group_summaries_for_scans_task(
|
||||
tenant_id=self.tenant_id,
|
||||
scan_ids=[scan_id_1, scan_id_2, scan_id_1, ""],
|
||||
)
|
||||
|
||||
assert result == {"scans_reaggregated": 2}
|
||||
assert mock_agg_task.si.call_count == 2
|
||||
mock_group_result.apply_async.assert_called_once()
|
||||
|
||||
@patch("tasks.tasks.group")
|
||||
def test_empty_scan_ids_skips_dispatch(self, mock_group):
|
||||
result = reaggregate_finding_group_summaries_for_scans_task(
|
||||
tenant_id=self.tenant_id,
|
||||
scan_ids=[],
|
||||
)
|
||||
|
||||
assert result == {"scans_reaggregated": 0}
|
||||
mock_group.assert_not_called()
|
||||
|
||||
Reference in New Issue
Block a user