fix(api): align get_compliance_frameworks with Compliance.get_bulk (#10903)

This commit is contained in:
Josema Camacho
2026-04-27 18:10:08 +02:00
committed by GitHub
parent df76efc197
commit 15ca69942d
3 changed files with 71 additions and 8 deletions
+4
View File
@@ -16,6 +16,10 @@ All notable changes to the **Prowler API** are documented in this file.
- `aggregate_findings`, `aggregate_attack_surface`, `aggregate_scan_resource_group_summaries` and `aggregate_scan_category_summaries` now upsert via `bulk_create(update_conflicts=True, ...)` instead of the prior `ignore_conflicts=True` / plain INSERT / `already backfilled` short-circuit. Re-runs triggered by the post-mute reaggregation pipeline no longer trip the `unique_*_per_scan` constraints nor silently drop updates, and are race-safe under concurrent writers (e.g. scan completion overlapping with a fresh mute rule) [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
- Rename the scan-category and scan-resource-group summary aggregators from `backfill_*` to `aggregate_*` (`backfill_scan_category_summaries` -> `aggregate_scan_category_summaries`, `backfill_scan_resource_group_summaries` -> `aggregate_scan_resource_group_summaries`; Celery task names `backfill-scan-category-summaries` -> `scan-category-summaries`, `backfill-scan-resource-group-summaries` -> `scan-resource-group-summaries`) and move them to the `overview` queue, matching the sibling per-scan aggregators (`perform_scan_summary_task`, `aggregate_daily_severity_task`, `aggregate_finding_group_summaries_task`, `aggregate_attack_surface_task`). The old names had no dispatchers outside the post-mute reaggregation chain, so no task-registry migration is required [(#10843)](https://github.com/prowler-cloud/prowler/pull/10843)
### 🐞 Fixed
- `generate_outputs_task` crashing with `KeyError` for compliance frameworks listed by `get_compliance_frameworks` but not loadable by `Compliance.get_bulk` [(#10903)](https://github.com/prowler-cloud/prowler/pull/10903)
---
## [1.25.4] (Prowler v5.24.4)
+7 -8
View File
@@ -1,7 +1,6 @@
from collections.abc import Iterable, Mapping
from api.models import Provider
from prowler.config.config import get_available_compliance_frameworks
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.check.models import CheckMetadata
@@ -95,12 +94,12 @@ PROWLER_CHECKS = LazyChecksMapping()
def get_compliance_frameworks(provider_type: Provider.ProviderChoices) -> list[str]:
"""
Retrieve and cache the list of available compliance frameworks for a specific cloud provider.
"""List compliance frameworks the API can load for `provider_type`.
This function lazily loads and caches the available compliance frameworks (e.g., CIS, MITRE, ISO)
for each provider type (AWS, Azure, GCP, etc.) on first access. Subsequent calls for the same
provider will return the cached result.
The list is sourced from `Compliance.get_bulk` so that the names
returned here are guaranteed to be loadable by the bulk loader. This
prevents downstream key mismatches (e.g. CSV report generation iterating
framework names and looking them up in the bulk dict).
Args:
provider_type (Provider.ProviderChoices): The cloud provider type for which to retrieve
@@ -112,8 +111,8 @@ def get_compliance_frameworks(provider_type: Provider.ProviderChoices) -> list[s
"""
global AVAILABLE_COMPLIANCE_FRAMEWORKS
if provider_type not in AVAILABLE_COMPLIANCE_FRAMEWORKS:
AVAILABLE_COMPLIANCE_FRAMEWORKS[provider_type] = (
get_available_compliance_frameworks(provider_type)
AVAILABLE_COMPLIANCE_FRAMEWORKS[provider_type] = list(
Compliance.get_bulk(provider_type).keys()
)
return AVAILABLE_COMPLIANCE_FRAMEWORKS[provider_type]
@@ -1,13 +1,18 @@
from unittest.mock import MagicMock, patch
import pytest
from api import compliance as compliance_module
from api.compliance import (
generate_compliance_overview_template,
generate_scan_compliance,
get_compliance_frameworks,
get_prowler_provider_checks,
get_prowler_provider_compliance,
load_prowler_checks,
)
from api.models import Provider
from prowler.lib.check.compliance_models import Compliance
class TestCompliance:
@@ -250,3 +255,58 @@ class TestCompliance:
}
assert template == expected_template
@pytest.fixture
def reset_compliance_cache():
"""Reset the module-level cache so each test starts cold."""
previous = dict(compliance_module.AVAILABLE_COMPLIANCE_FRAMEWORKS)
compliance_module.AVAILABLE_COMPLIANCE_FRAMEWORKS.clear()
try:
yield
finally:
compliance_module.AVAILABLE_COMPLIANCE_FRAMEWORKS.clear()
compliance_module.AVAILABLE_COMPLIANCE_FRAMEWORKS.update(previous)
class TestGetComplianceFrameworks:
def test_returns_keys_from_compliance_get_bulk(self, reset_compliance_cache):
with patch("api.compliance.Compliance") as mock_compliance:
mock_compliance.get_bulk.return_value = {
"cis_1.4_aws": MagicMock(),
"mitre_attack_aws": MagicMock(),
}
result = get_compliance_frameworks(Provider.ProviderChoices.AWS)
assert sorted(result) == ["cis_1.4_aws", "mitre_attack_aws"]
mock_compliance.get_bulk.assert_called_once_with(Provider.ProviderChoices.AWS)
def test_caches_result_per_provider(self, reset_compliance_cache):
with patch("api.compliance.Compliance") as mock_compliance:
mock_compliance.get_bulk.return_value = {"cis_1.4_aws": MagicMock()}
get_compliance_frameworks(Provider.ProviderChoices.AWS)
get_compliance_frameworks(Provider.ProviderChoices.AWS)
# Cached after first call.
assert mock_compliance.get_bulk.call_count == 1
@pytest.mark.parametrize(
"provider_type",
[choice.value for choice in Provider.ProviderChoices],
)
def test_listing_is_subset_of_bulk(self, reset_compliance_cache, provider_type):
"""Regression for CLOUD-API-40S: every name returned by
``get_compliance_frameworks`` must be loadable via ``Compliance.get_bulk``.
A divergence here is what produced ``KeyError: 'csa_ccm_4.0'`` in
``generate_outputs_task`` after universal/multi-provider compliance
JSONs were introduced at the top-level ``prowler/compliance/`` path.
"""
bulk_keys = set(Compliance.get_bulk(provider_type).keys())
listed = set(get_compliance_frameworks(provider_type))
missing = listed - bulk_keys
assert not missing, (
f"get_compliance_frameworks({provider_type!r}) returned names not "
f"loadable by Compliance.get_bulk: {sorted(missing)}"
)