mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
feat(compliance): add DORA framework for AWS (#11131)
This commit is contained in:
+2
-1
@@ -2,12 +2,13 @@
|
||||
|
||||
All notable changes to the **Prowler API** are documented in this file.
|
||||
|
||||
## [1.31.0] (Prowler v5.30.0)
|
||||
## [1.31.0] (Prowler UNRELEASED)
|
||||
|
||||
### 🚀 Added
|
||||
|
||||
- Automatic recovery of allowlisted idempotent background tasks whose worker died during a deploy or crash: stuck scan and summary tasks are detected and re-run instead of staying pending forever, with a `reconcile_orphan_tasks` management command for on-demand recovery [(#11416)](https://github.com/prowler-cloud/prowler/pull/11416)
|
||||
- Jira integration no longer creates duplicate issues on a retried send; findings already ticketed are skipped [(#11416)](https://github.com/prowler-cloud/prowler/pull/11416)
|
||||
- DORA compliance framework support [(#11131)](https://github.com/prowler-cloud/prowler/pull/11131)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
from collections.abc import Iterable, Mapping
|
||||
|
||||
from api.models import Provider
|
||||
from prowler.lib.check.compliance_models import Compliance
|
||||
from prowler.lib.check.compliance_models import (
|
||||
get_bulk_compliance_frameworks_universal,
|
||||
)
|
||||
from prowler.lib.check.models import CheckMetadata
|
||||
|
||||
AVAILABLE_COMPLIANCE_FRAMEWORKS = {}
|
||||
@@ -94,25 +96,22 @@ PROWLER_CHECKS = LazyChecksMapping()
|
||||
|
||||
|
||||
def get_compliance_frameworks(provider_type: Provider.ProviderChoices) -> list[str]:
|
||||
"""List compliance frameworks the API can load for `provider_type`.
|
||||
"""List compliance framework identifiers available for `provider_type`.
|
||||
|
||||
The list is sourced from `Compliance.get_bulk` so that the names
|
||||
returned here are guaranteed to be loadable by the bulk loader. This
|
||||
prevents downstream key mismatches (e.g. CSV report generation iterating
|
||||
framework names and looking them up in the bulk dict).
|
||||
Includes both per-provider frameworks and universal top-level frameworks
|
||||
(e.g. ``dora``, ``csa_ccm_4.0``).
|
||||
|
||||
Args:
|
||||
provider_type (Provider.ProviderChoices): The cloud provider type for which to retrieve
|
||||
available compliance frameworks (e.g., "aws", "azure", "gcp", "m365").
|
||||
provider_type (Provider.ProviderChoices): The cloud provider type
|
||||
(e.g., "aws", "azure", "gcp", "m365").
|
||||
|
||||
Returns:
|
||||
list[str]: A list of framework identifiers (e.g., "cis_1.4_aws", "mitre_attack_azure") available
|
||||
for the given provider.
|
||||
list[str]: Framework identifiers (e.g., "cis_1.4_aws", "dora").
|
||||
"""
|
||||
global AVAILABLE_COMPLIANCE_FRAMEWORKS
|
||||
if provider_type not in AVAILABLE_COMPLIANCE_FRAMEWORKS:
|
||||
AVAILABLE_COMPLIANCE_FRAMEWORKS[provider_type] = list(
|
||||
Compliance.get_bulk(provider_type).keys()
|
||||
get_bulk_compliance_frameworks_universal(provider_type).keys()
|
||||
)
|
||||
|
||||
return AVAILABLE_COMPLIANCE_FRAMEWORKS[provider_type]
|
||||
@@ -139,18 +138,14 @@ def get_prowler_provider_compliance(provider_type: Provider.ProviderChoices) ->
|
||||
"""
|
||||
Retrieve the Prowler compliance data for a specified provider type.
|
||||
|
||||
This function fetches the compliance frameworks and their associated
|
||||
requirements for the given cloud provider.
|
||||
|
||||
Args:
|
||||
provider_type (Provider.ProviderChoices): The provider type
|
||||
(e.g., 'aws', 'azure') for which to retrieve compliance data.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary mapping compliance framework names to their respective
|
||||
Compliance objects for the specified provider.
|
||||
dict: Mapping of framework name to `ComplianceFramework` for the provider.
|
||||
"""
|
||||
return Compliance.get_bulk(provider_type)
|
||||
return get_bulk_compliance_frameworks_universal(provider_type)
|
||||
|
||||
|
||||
def _load_provider_assets(provider_type: Provider.ProviderChoices) -> tuple[dict, dict]:
|
||||
@@ -209,8 +204,8 @@ def load_prowler_checks(
|
||||
for compliance_name, compliance_data in prowler_compliance.get(
|
||||
provider_type, {}
|
||||
).items():
|
||||
for requirement in compliance_data.Requirements:
|
||||
for check in requirement.Checks:
|
||||
for requirement in compliance_data.requirements:
|
||||
for check in requirement.checks.get(provider_type, []):
|
||||
try:
|
||||
checks[provider_type][check].add(compliance_name)
|
||||
except KeyError:
|
||||
@@ -290,24 +285,40 @@ def generate_compliance_overview_template(
|
||||
requirements_status = {"passed": 0, "failed": 0, "manual": 0}
|
||||
total_requirements = 0
|
||||
|
||||
for requirement in compliance_data.Requirements:
|
||||
for requirement in compliance_data.requirements:
|
||||
total_requirements += 1
|
||||
total_checks = len(requirement.Checks)
|
||||
checks_dict = {check: None for check in requirement.Checks}
|
||||
provider_check_list = list(requirement.checks.get(provider_type, []))
|
||||
total_checks = len(provider_check_list)
|
||||
checks_dict = {check: None for check in provider_check_list}
|
||||
|
||||
req_status_val = "MANUAL" if total_checks == 0 else "PASS"
|
||||
|
||||
# MITRE attrs are wrapped under `_raw_attributes` by the
|
||||
# universal adapter — unwrap so consumers see the flat list.
|
||||
requirement_attributes = requirement.attributes
|
||||
if (
|
||||
isinstance(requirement_attributes, dict)
|
||||
and "_raw_attributes" in requirement_attributes
|
||||
):
|
||||
attributes_payload = list(requirement_attributes["_raw_attributes"])
|
||||
elif isinstance(requirement_attributes, dict):
|
||||
attributes_payload = (
|
||||
[dict(requirement_attributes)] if requirement_attributes else []
|
||||
)
|
||||
else:
|
||||
attributes_payload = [
|
||||
dict(attribute) for attribute in requirement_attributes
|
||||
]
|
||||
|
||||
# Build requirement dictionary
|
||||
requirement_dict = {
|
||||
"name": requirement.Name or requirement.Id,
|
||||
"description": requirement.Description,
|
||||
"tactics": getattr(requirement, "Tactics", []),
|
||||
"subtechniques": getattr(requirement, "SubTechniques", []),
|
||||
"platforms": getattr(requirement, "Platforms", []),
|
||||
"technique_url": getattr(requirement, "TechniqueURL", ""),
|
||||
"attributes": [
|
||||
dict(attribute) for attribute in requirement.Attributes
|
||||
],
|
||||
"name": requirement.name or requirement.id,
|
||||
"description": requirement.description,
|
||||
"tactics": requirement.tactics or [],
|
||||
"subtechniques": requirement.sub_techniques or [],
|
||||
"platforms": requirement.platforms or [],
|
||||
"technique_url": requirement.technique_url or "",
|
||||
"attributes": attributes_payload,
|
||||
"checks": checks_dict,
|
||||
"checks_status": {
|
||||
"pass": 0,
|
||||
@@ -325,15 +336,15 @@ def generate_compliance_overview_template(
|
||||
requirements_status["passed"] += 1
|
||||
|
||||
# Add requirement to compliance requirements
|
||||
compliance_requirements[requirement.Id] = requirement_dict
|
||||
compliance_requirements[requirement.id] = requirement_dict
|
||||
|
||||
# Build compliance dictionary
|
||||
compliance_dict = {
|
||||
"framework": compliance_data.Framework,
|
||||
"name": compliance_data.Name,
|
||||
"version": compliance_data.Version,
|
||||
"framework": compliance_data.framework,
|
||||
"name": compliance_data.name,
|
||||
"version": compliance_data.version,
|
||||
"provider": provider_type,
|
||||
"description": compliance_data.Description,
|
||||
"description": compliance_data.description,
|
||||
"requirements": compliance_requirements,
|
||||
"requirements_status": requirements_status,
|
||||
"total_requirements": total_requirements,
|
||||
|
||||
@@ -13137,8 +13137,59 @@ paths:
|
||||
responses:
|
||||
'200':
|
||||
description: CSV file containing the compliance report
|
||||
'202':
|
||||
description: The task is in progress
|
||||
'403':
|
||||
description: There is a problem with credentials
|
||||
'404':
|
||||
description: Compliance report not found
|
||||
description: Compliance report not found, or the scan has no reports yet
|
||||
/api/v1/scans/{id}/compliance/{name}/ocsf:
|
||||
get:
|
||||
operationId: scans_compliance_ocsf_retrieve
|
||||
description: Download a specific compliance report as an OCSF JSON file. Only
|
||||
universal frameworks that declare an output configuration produce this artifact
|
||||
(currently 'dora' and 'csa_ccm_4.0'); any other framework returns 404.
|
||||
summary: Retrieve compliance report as OCSF JSON
|
||||
parameters:
|
||||
- in: query
|
||||
name: fields[scan-reports]
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
enum:
|
||||
- id
|
||||
- name
|
||||
description: endpoint return only specific fields in the response on a per-type
|
||||
basis by including a fields[TYPE] query parameter.
|
||||
explode: false
|
||||
- in: path
|
||||
name: id
|
||||
schema:
|
||||
type: string
|
||||
format: uuid
|
||||
description: A UUID string identifying this scan.
|
||||
required: true
|
||||
- in: path
|
||||
name: name
|
||||
schema:
|
||||
type: string
|
||||
description: The compliance report name, like 'dora'
|
||||
required: true
|
||||
tags:
|
||||
- Scan
|
||||
security:
|
||||
- JWT or API Key: []
|
||||
responses:
|
||||
'200':
|
||||
description: OCSF JSON file containing the compliance report
|
||||
'202':
|
||||
description: The task is in progress
|
||||
'403':
|
||||
description: There is a problem with credentials
|
||||
'404':
|
||||
description: Compliance report not found, the framework does not provide
|
||||
an OCSF export, or the scan has no reports yet
|
||||
/api/v1/scans/{id}/csa:
|
||||
get:
|
||||
operationId: scans_csa_retrieve
|
||||
|
||||
@@ -12,7 +12,9 @@ from api.compliance import (
|
||||
load_prowler_checks,
|
||||
)
|
||||
from api.models import Provider
|
||||
from prowler.lib.check.compliance_models import Compliance
|
||||
from prowler.lib.check.compliance_models import (
|
||||
get_bulk_compliance_frameworks_universal,
|
||||
)
|
||||
|
||||
|
||||
class TestCompliance:
|
||||
@@ -28,16 +30,16 @@ class TestCompliance:
|
||||
assert set(checks) == {"check1", "check2", "check3"}
|
||||
mock_check_metadata.get_bulk.assert_called_once_with(provider_type)
|
||||
|
||||
@patch("api.compliance.Compliance")
|
||||
def test_get_prowler_provider_compliance(self, mock_compliance):
|
||||
@patch("api.compliance.get_bulk_compliance_frameworks_universal")
|
||||
def test_get_prowler_provider_compliance(self, mock_get_bulk):
|
||||
provider_type = Provider.ProviderChoices.AWS
|
||||
mock_compliance.get_bulk.return_value = {
|
||||
mock_get_bulk.return_value = {
|
||||
"compliance1": MagicMock(),
|
||||
"compliance2": MagicMock(),
|
||||
}
|
||||
compliance_data = get_prowler_provider_compliance(provider_type)
|
||||
assert compliance_data == mock_compliance.get_bulk.return_value
|
||||
mock_compliance.get_bulk.assert_called_once_with(provider_type)
|
||||
assert compliance_data == mock_get_bulk.return_value
|
||||
mock_get_bulk.assert_called_once_with(provider_type)
|
||||
|
||||
@patch("api.compliance.get_prowler_provider_checks")
|
||||
@patch("api.models.Provider.ProviderChoices")
|
||||
@@ -51,9 +53,9 @@ class TestCompliance:
|
||||
prowler_compliance = {
|
||||
"aws": {
|
||||
"compliance1": MagicMock(
|
||||
Requirements=[
|
||||
requirements=[
|
||||
MagicMock(
|
||||
Checks=["check1", "check2"],
|
||||
checks={"aws": ["check1", "check2"]},
|
||||
),
|
||||
],
|
||||
),
|
||||
@@ -167,35 +169,38 @@ class TestCompliance:
|
||||
def test_generate_compliance_overview_template(self, mock_provider_choices):
|
||||
mock_provider_choices.values = ["aws"]
|
||||
|
||||
# ``name`` is a reserved MagicMock kwarg (it labels the mock for repr,
|
||||
# it does NOT set a ``.name`` attribute), so it must be assigned
|
||||
# explicitly after construction.
|
||||
requirement1 = MagicMock(
|
||||
Id="requirement1",
|
||||
Name="Requirement 1",
|
||||
Description="Description of requirement 1",
|
||||
Attributes=[],
|
||||
Checks=["check1", "check2"],
|
||||
Tactics=["tactic1"],
|
||||
SubTechniques=["subtechnique1"],
|
||||
Platforms=["platform1"],
|
||||
TechniqueURL="https://example.com",
|
||||
id="requirement1",
|
||||
description="Description of requirement 1",
|
||||
attributes=[],
|
||||
checks={"aws": ["check1", "check2"]},
|
||||
tactics=["tactic1"],
|
||||
sub_techniques=["subtechnique1"],
|
||||
platforms=["platform1"],
|
||||
technique_url="https://example.com",
|
||||
)
|
||||
requirement1.name = "Requirement 1"
|
||||
requirement2 = MagicMock(
|
||||
Id="requirement2",
|
||||
Name="Requirement 2",
|
||||
Description="Description of requirement 2",
|
||||
Attributes=[],
|
||||
Checks=[],
|
||||
Tactics=[],
|
||||
SubTechniques=[],
|
||||
Platforms=[],
|
||||
TechniqueURL="",
|
||||
id="requirement2",
|
||||
description="Description of requirement 2",
|
||||
attributes=[],
|
||||
checks={"aws": []},
|
||||
tactics=[],
|
||||
sub_techniques=[],
|
||||
platforms=[],
|
||||
technique_url="",
|
||||
)
|
||||
requirement2.name = "Requirement 2"
|
||||
compliance1 = MagicMock(
|
||||
Requirements=[requirement1, requirement2],
|
||||
Framework="Framework 1",
|
||||
Version="1.0",
|
||||
Description="Description of compliance1",
|
||||
Name="Compliance 1",
|
||||
requirements=[requirement1, requirement2],
|
||||
framework="Framework 1",
|
||||
version="1.0",
|
||||
description="Description of compliance1",
|
||||
)
|
||||
compliance1.name = "Compliance 1"
|
||||
prowler_compliance = {"aws": {"compliance1": compliance1}}
|
||||
|
||||
template = generate_compliance_overview_template(prowler_compliance)
|
||||
@@ -271,24 +276,28 @@ def reset_compliance_cache():
|
||||
|
||||
class TestGetComplianceFrameworks:
|
||||
def test_returns_keys_from_compliance_get_bulk(self, reset_compliance_cache):
|
||||
with patch("api.compliance.Compliance") as mock_compliance:
|
||||
mock_compliance.get_bulk.return_value = {
|
||||
with patch(
|
||||
"api.compliance.get_bulk_compliance_frameworks_universal"
|
||||
) as mock_get_bulk:
|
||||
mock_get_bulk.return_value = {
|
||||
"cis_1.4_aws": MagicMock(),
|
||||
"mitre_attack_aws": MagicMock(),
|
||||
}
|
||||
result = get_compliance_frameworks(Provider.ProviderChoices.AWS)
|
||||
|
||||
assert sorted(result) == ["cis_1.4_aws", "mitre_attack_aws"]
|
||||
mock_compliance.get_bulk.assert_called_once_with(Provider.ProviderChoices.AWS)
|
||||
mock_get_bulk.assert_called_once_with(Provider.ProviderChoices.AWS)
|
||||
|
||||
def test_caches_result_per_provider(self, reset_compliance_cache):
|
||||
with patch("api.compliance.Compliance") as mock_compliance:
|
||||
mock_compliance.get_bulk.return_value = {"cis_1.4_aws": MagicMock()}
|
||||
with patch(
|
||||
"api.compliance.get_bulk_compliance_frameworks_universal"
|
||||
) as mock_get_bulk:
|
||||
mock_get_bulk.return_value = {"cis_1.4_aws": MagicMock()}
|
||||
get_compliance_frameworks(Provider.ProviderChoices.AWS)
|
||||
get_compliance_frameworks(Provider.ProviderChoices.AWS)
|
||||
|
||||
# Cached after first call.
|
||||
assert mock_compliance.get_bulk.call_count == 1
|
||||
assert mock_get_bulk.call_count == 1
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"provider_type",
|
||||
@@ -296,17 +305,19 @@ class TestGetComplianceFrameworks:
|
||||
)
|
||||
def test_listing_is_subset_of_bulk(self, reset_compliance_cache, provider_type):
|
||||
"""Regression for CLOUD-API-40S: every name returned by
|
||||
``get_compliance_frameworks`` must be loadable via ``Compliance.get_bulk``.
|
||||
``get_compliance_frameworks`` must be loadable via
|
||||
``get_bulk_compliance_frameworks_universal``.
|
||||
|
||||
A divergence here is what produced ``KeyError: 'csa_ccm_4.0'`` in
|
||||
``generate_outputs_task`` after universal/multi-provider compliance
|
||||
JSONs were introduced at the top-level ``prowler/compliance/`` path.
|
||||
"""
|
||||
bulk_keys = set(Compliance.get_bulk(provider_type).keys())
|
||||
bulk_keys = set(get_bulk_compliance_frameworks_universal(provider_type).keys())
|
||||
listed = set(get_compliance_frameworks(provider_type))
|
||||
|
||||
missing = listed - bulk_keys
|
||||
assert not missing, (
|
||||
f"get_compliance_frameworks({provider_type!r}) returned names not "
|
||||
f"loadable by Compliance.get_bulk: {sorted(missing)}"
|
||||
f"loadable by get_bulk_compliance_frameworks_universal: "
|
||||
f"{sorted(missing)}"
|
||||
)
|
||||
|
||||
@@ -9560,6 +9560,16 @@ class TestComplianceOverviewViewSet:
|
||||
assert "platforms" in attributes["attributes"]["technique_details"]
|
||||
assert "technique_url" in attributes["attributes"]["technique_details"]
|
||||
|
||||
# Guard against the `_raw_attributes` wrapper leaking through —
|
||||
# the UI reads metadata[i].Category / .AWSService directly.
|
||||
metadata = attributes["attributes"]["metadata"]
|
||||
assert isinstance(metadata, list) and len(metadata) > 0
|
||||
first_attr = metadata[0]
|
||||
assert isinstance(first_attr, dict)
|
||||
assert "_raw_attributes" not in first_attr
|
||||
assert "Category" in first_attr
|
||||
assert "AWSService" in first_attr
|
||||
|
||||
def test_compliance_overview_attributes_missing_compliance_id(
|
||||
self, authenticated_client
|
||||
):
|
||||
|
||||
+105
-44
@@ -116,6 +116,7 @@ from api.base_views import BaseRLSViewSet, BaseTenantViewset, BaseUserViewset
|
||||
from api.compliance import (
|
||||
PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE,
|
||||
get_compliance_frameworks,
|
||||
get_prowler_provider_compliance,
|
||||
)
|
||||
from api.constants import SEVERITY_ORDER
|
||||
from api.db_router import MainRouter
|
||||
@@ -1849,7 +1850,42 @@ class ProviderViewSet(DisablePaginationMixin, BaseRLSViewSet):
|
||||
200: OpenApiResponse(
|
||||
description="CSV file containing the compliance report"
|
||||
),
|
||||
404: OpenApiResponse(description="Compliance report not found"),
|
||||
202: OpenApiResponse(description="The task is in progress"),
|
||||
403: OpenApiResponse(description="There is a problem with credentials"),
|
||||
404: OpenApiResponse(
|
||||
description="Compliance report not found, or the scan has no reports yet"
|
||||
),
|
||||
},
|
||||
request=None,
|
||||
),
|
||||
compliance_ocsf=extend_schema(
|
||||
tags=["Scan"],
|
||||
summary="Retrieve compliance report as OCSF JSON",
|
||||
description=(
|
||||
"Download a specific compliance report as an OCSF JSON file. "
|
||||
"Only universal frameworks that declare an output configuration "
|
||||
"produce this artifact (currently 'dora' and 'csa_ccm_4.0'); any "
|
||||
"other framework returns 404."
|
||||
),
|
||||
parameters=[
|
||||
OpenApiParameter(
|
||||
name="name",
|
||||
type=str,
|
||||
location=OpenApiParameter.PATH,
|
||||
required=True,
|
||||
description="The compliance report name, like 'dora'",
|
||||
),
|
||||
],
|
||||
responses={
|
||||
200: OpenApiResponse(
|
||||
description="OCSF JSON file containing the compliance report"
|
||||
),
|
||||
202: OpenApiResponse(description="The task is in progress"),
|
||||
403: OpenApiResponse(description="There is a problem with credentials"),
|
||||
404: OpenApiResponse(
|
||||
description="Compliance report not found, the framework does "
|
||||
"not provide an OCSF export, or the scan has no reports yet"
|
||||
),
|
||||
},
|
||||
request=None,
|
||||
),
|
||||
@@ -1992,35 +2028,23 @@ class ScanViewSet(BaseRLSViewSet):
|
||||
return queryset.select_related("provider", "task")
|
||||
|
||||
def get_serializer_class(self):
|
||||
if self.action == "create":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
return ScanCreateSerializer
|
||||
elif self.action == "partial_update":
|
||||
if self.action == "partial_update":
|
||||
return ScanUpdateSerializer
|
||||
elif self.action == "report":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
return ScanReportSerializer
|
||||
elif self.action == "compliance":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
return ScanComplianceReportSerializer
|
||||
elif self.action == "threatscore":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
elif self.action == "ens":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
elif self.action == "nis2":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
elif self.action == "csa":
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
elif self.action == "cis":
|
||||
|
||||
action_defaults = {
|
||||
"create": ScanCreateSerializer,
|
||||
"report": ScanReportSerializer,
|
||||
"compliance": ScanComplianceReportSerializer,
|
||||
"compliance_ocsf": ScanComplianceReportSerializer,
|
||||
}
|
||||
response_only_actions = {"threatscore", "ens", "nis2", "csa", "cis"}
|
||||
|
||||
if self.action in action_defaults or self.action in response_only_actions:
|
||||
if hasattr(self, "response_serializer_class"):
|
||||
return self.response_serializer_class
|
||||
if self.action in action_defaults:
|
||||
return action_defaults[self.action]
|
||||
|
||||
return super().get_serializer_class()
|
||||
|
||||
def partial_update(self, request, *args, **kwargs):
|
||||
@@ -2269,20 +2293,16 @@ class ScanViewSet(BaseRLSViewSet):
|
||||
content, filename = loader
|
||||
return self._serve_file(content, filename, "application/x-zip-compressed")
|
||||
|
||||
@action(
|
||||
detail=True,
|
||||
methods=["get"],
|
||||
url_path="compliance/(?P<name>[^/]+)",
|
||||
url_name="compliance",
|
||||
)
|
||||
def compliance(self, request, pk=None, name=None):
|
||||
scan = self.get_object()
|
||||
if name not in get_compliance_frameworks(scan.provider.provider):
|
||||
return Response(
|
||||
{"detail": f"Compliance '{name}' not found."},
|
||||
status=status.HTTP_404_NOT_FOUND,
|
||||
)
|
||||
def _serve_compliance_artifact(self, scan, name, file_extension, content_type):
|
||||
"""Resolve and serve a per-framework compliance artifact from disk/S3.
|
||||
|
||||
Shared by the CSV and OCSF compliance download actions. Both are
|
||||
path-based (no query params) on purpose: ``get_object`` runs
|
||||
``filter_queryset``, which triggers JSON:API's
|
||||
``QueryParameterValidationFilter`` and 400s on any non-JSON:API
|
||||
query param, so a ``?format=`` / ``?type=`` selector is not viable
|
||||
here — the format is encoded in the route instead.
|
||||
"""
|
||||
running_resp = self._get_task_status(scan)
|
||||
if running_resp:
|
||||
return running_resp
|
||||
@@ -2299,25 +2319,66 @@ class ScanViewSet(BaseRLSViewSet):
|
||||
bucket = env.str("DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET", "")
|
||||
key_prefix = scan.output_location.removeprefix(f"s3://{bucket}/")
|
||||
prefix = os.path.join(
|
||||
os.path.dirname(key_prefix), "compliance", f"{name}.csv"
|
||||
os.path.dirname(key_prefix), "compliance", f"{name}.{file_extension}"
|
||||
)
|
||||
loader = self._load_file(
|
||||
prefix,
|
||||
s3=True,
|
||||
bucket=bucket,
|
||||
list_objects=True,
|
||||
content_type="text/csv",
|
||||
content_type=content_type,
|
||||
)
|
||||
else:
|
||||
base = os.path.dirname(scan.output_location)
|
||||
pattern = os.path.join(base, "compliance", f"*_{name}.csv")
|
||||
pattern = os.path.join(base, "compliance", f"*_{name}.{file_extension}")
|
||||
loader = self._load_file(pattern, s3=False)
|
||||
|
||||
if isinstance(loader, HttpResponseBase):
|
||||
return loader
|
||||
|
||||
content, filename = loader
|
||||
return self._serve_file(content, filename, "text/csv")
|
||||
return self._serve_file(content, filename, content_type)
|
||||
|
||||
@action(
|
||||
detail=True,
|
||||
methods=["get"],
|
||||
url_path="compliance/(?P<name>[^/]+)",
|
||||
url_name="compliance",
|
||||
)
|
||||
def compliance(self, request, pk=None, name=None):
|
||||
scan = self.get_object()
|
||||
if name not in get_compliance_frameworks(scan.provider.provider):
|
||||
return Response(
|
||||
{"detail": f"Compliance '{name}' not found."},
|
||||
status=status.HTTP_404_NOT_FOUND,
|
||||
)
|
||||
return self._serve_compliance_artifact(scan, name, "csv", "text/csv")
|
||||
|
||||
@action(
|
||||
detail=True,
|
||||
methods=["get"],
|
||||
url_path="compliance/(?P<name>[^/]+)/ocsf",
|
||||
url_name="compliance-ocsf",
|
||||
)
|
||||
def compliance_ocsf(self, request, pk=None, name=None):
|
||||
scan = self.get_object()
|
||||
if name not in get_compliance_frameworks(scan.provider.provider):
|
||||
return Response(
|
||||
{"detail": f"Compliance '{name}' not found."},
|
||||
status=status.HTTP_404_NOT_FOUND,
|
||||
)
|
||||
|
||||
universal_bulk = get_prowler_provider_compliance(scan.provider.provider)
|
||||
framework_obj = universal_bulk.get(name)
|
||||
if not (framework_obj and getattr(framework_obj, "outputs", None)):
|
||||
return Response(
|
||||
{"detail": f"Compliance '{name}' does not provide an OCSF export."},
|
||||
status=status.HTTP_404_NOT_FOUND,
|
||||
)
|
||||
|
||||
return self._serve_compliance_artifact(
|
||||
scan, name, "ocsf.json", "application/json"
|
||||
)
|
||||
|
||||
@action(
|
||||
detail=True,
|
||||
|
||||
@@ -39,11 +39,6 @@ from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
|
||||
from prowler.lib.outputs.compliance.cisa_scuba.cisa_scuba_googleworkspace import (
|
||||
GoogleWorkspaceCISASCuBA,
|
||||
)
|
||||
from prowler.lib.outputs.compliance.csa.csa_alibabacloud import AlibabaCloudCSA
|
||||
from prowler.lib.outputs.compliance.csa.csa_aws import AWSCSA
|
||||
from prowler.lib.outputs.compliance.csa.csa_azure import AzureCSA
|
||||
from prowler.lib.outputs.compliance.csa.csa_gcp import GCPCSA
|
||||
from prowler.lib.outputs.compliance.csa.csa_oraclecloud import OracleCloudCSA
|
||||
from prowler.lib.outputs.compliance.ens.ens_aws import AWSENS
|
||||
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
|
||||
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
|
||||
@@ -102,7 +97,6 @@ COMPLIANCE_CLASS_MAP = {
|
||||
(lambda name: name == "prowler_threatscore_aws", ProwlerThreatScoreAWS),
|
||||
(lambda name: name.startswith("ccc_"), CCC_AWS),
|
||||
(lambda name: name.startswith("c5_"), AWSC5),
|
||||
(lambda name: name.startswith("csa_"), AWSCSA),
|
||||
(lambda name: name == "asd_essential_eight_aws", ASDEssentialEightAWS),
|
||||
],
|
||||
"azure": [
|
||||
@@ -113,7 +107,6 @@ COMPLIANCE_CLASS_MAP = {
|
||||
(lambda name: name.startswith("ccc_"), CCC_Azure),
|
||||
(lambda name: name == "prowler_threatscore_azure", ProwlerThreatScoreAzure),
|
||||
(lambda name: name == "c5_azure", AzureC5),
|
||||
(lambda name: name.startswith("csa_"), AzureCSA),
|
||||
],
|
||||
"gcp": [
|
||||
(lambda name: name.startswith("cis_"), GCPCIS),
|
||||
@@ -123,7 +116,6 @@ COMPLIANCE_CLASS_MAP = {
|
||||
(lambda name: name == "prowler_threatscore_gcp", ProwlerThreatScoreGCP),
|
||||
(lambda name: name.startswith("ccc_"), CCC_GCP),
|
||||
(lambda name: name == "c5_gcp", GCPC5),
|
||||
(lambda name: name.startswith("csa_"), GCPCSA),
|
||||
],
|
||||
"kubernetes": [
|
||||
(lambda name: name.startswith("cis_"), KubernetesCIS),
|
||||
@@ -152,11 +144,9 @@ COMPLIANCE_CLASS_MAP = {
|
||||
"image": [],
|
||||
"oraclecloud": [
|
||||
(lambda name: name.startswith("cis_"), OracleCloudCIS),
|
||||
(lambda name: name.startswith("csa_"), OracleCloudCSA),
|
||||
],
|
||||
"alibabacloud": [
|
||||
(lambda name: name.startswith("cis_"), AlibabaCloudCIS),
|
||||
(lambda name: name.startswith("csa_"), AlibabaCloudCSA),
|
||||
(
|
||||
lambda name: name == "prowler_threatscore_alibabacloud",
|
||||
ProwlerThreatScoreAlibaba,
|
||||
|
||||
@@ -29,7 +29,10 @@ from api.db_router import READ_REPLICA_ALIAS, MainRouter
|
||||
from api.db_utils import rls_transaction
|
||||
from api.models import Provider, Scan, ScanSummary, StateChoices, ThreatScoreSnapshot
|
||||
from api.utils import initialize_prowler_provider
|
||||
from prowler.lib.check.compliance_models import Compliance
|
||||
from prowler.lib.check.compliance_models import (
|
||||
Compliance,
|
||||
get_bulk_compliance_frameworks_universal,
|
||||
)
|
||||
from prowler.lib.outputs.finding import Finding as FindingOutput
|
||||
|
||||
logger = get_task_logger(__name__)
|
||||
@@ -571,7 +574,7 @@ def generate_csa_report(
|
||||
Args:
|
||||
tenant_id: The tenant ID for Row-Level Security context.
|
||||
scan_id: ID of the scan executed by Prowler.
|
||||
compliance_id: ID of the compliance framework (e.g., "csa_ccm_4.0_aws").
|
||||
compliance_id: ID of the compliance framework (e.g., "csa_ccm_4.0").
|
||||
output_path: Output PDF file path.
|
||||
provider_id: Provider ID for the scan.
|
||||
only_failed: If True, only include failed requirements in detailed section.
|
||||
@@ -883,9 +886,11 @@ def generate_compliance_reports(
|
||||
frameworks_bulk.get(f"nis2_{provider_type}")
|
||||
)
|
||||
if generate_csa:
|
||||
pending_checks_by_framework["csa"] = _get_compliance_check_ids(
|
||||
frameworks_bulk.get(f"csa_ccm_4.0_{provider_type}")
|
||||
)
|
||||
# csa_ccm_4.0 lives at the top level, not under compliance/{provider}/.
|
||||
csa_framework = frameworks_bulk.get(
|
||||
"csa_ccm_4.0"
|
||||
) or get_bulk_compliance_frameworks_universal(provider_type).get("csa_ccm_4.0")
|
||||
pending_checks_by_framework["csa"] = _get_compliance_check_ids(csa_framework)
|
||||
if generate_cis and latest_cis:
|
||||
pending_checks_by_framework["cis"] = _get_compliance_check_ids(
|
||||
frameworks_bulk.get(latest_cis)
|
||||
@@ -1183,7 +1188,7 @@ def generate_compliance_reports(
|
||||
if generate_csa:
|
||||
generated_report_keys.append("csa")
|
||||
csa_path = output_paths["csa"]
|
||||
compliance_id_csa = f"csa_ccm_4.0_{provider_type}"
|
||||
compliance_id_csa = "csa_ccm_4.0"
|
||||
pdf_path_csa = f"{csa_path}_csa_report.pdf"
|
||||
logger.info("Generating CSA CCM report with compliance %s", compliance_id_csa)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import time
|
||||
from abc import ABC, abstractmethod
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
from celery.utils.log import get_task_logger
|
||||
@@ -26,7 +27,10 @@ from api.db_router import READ_REPLICA_ALIAS
|
||||
from api.db_utils import rls_transaction
|
||||
from api.models import Provider, StatusChoices
|
||||
from api.utils import initialize_prowler_provider
|
||||
from prowler.lib.check.compliance_models import Compliance
|
||||
from prowler.lib.check.compliance_models import (
|
||||
Compliance,
|
||||
get_bulk_compliance_frameworks_universal,
|
||||
)
|
||||
from prowler.lib.outputs.finding import Finding as FindingOutput
|
||||
|
||||
from .components import (
|
||||
@@ -222,6 +226,46 @@ def get_requirement_metadata(
|
||||
return None
|
||||
|
||||
|
||||
def _universal_attributes_to_list(attributes) -> list:
|
||||
"""Flatten a universal requirement's ``attributes`` into a list of objects
|
||||
with attribute access. MITRE wraps its list under ``_raw_attributes``."""
|
||||
if isinstance(attributes, dict) and "_raw_attributes" in attributes:
|
||||
entries = attributes.get("_raw_attributes") or []
|
||||
return [
|
||||
SimpleNamespace(**entry) for entry in entries if isinstance(entry, dict)
|
||||
]
|
||||
if isinstance(attributes, dict):
|
||||
return [SimpleNamespace(**attributes)] if attributes else []
|
||||
return list(attributes or [])
|
||||
|
||||
|
||||
def _adapt_universal_to_legacy(framework, provider_type: str) -> SimpleNamespace:
|
||||
"""Expose a universal ``ComplianceFramework`` under the legacy ``Compliance``
|
||||
attribute names used by the PDF pipeline."""
|
||||
provider_key = (provider_type or "").lower()
|
||||
requirements = []
|
||||
for requirement in framework.requirements:
|
||||
checks_by_provider = (
|
||||
requirement.checks if isinstance(requirement.checks, dict) else {}
|
||||
)
|
||||
requirements.append(
|
||||
SimpleNamespace(
|
||||
Id=requirement.id,
|
||||
Description=requirement.description or "",
|
||||
Checks=list(checks_by_provider.get(provider_key, [])),
|
||||
Attributes=_universal_attributes_to_list(requirement.attributes),
|
||||
)
|
||||
)
|
||||
return SimpleNamespace(
|
||||
Framework=framework.framework,
|
||||
Name=framework.name,
|
||||
Version=framework.version or "",
|
||||
Description=framework.description or "",
|
||||
Provider=framework.provider or provider_type,
|
||||
Requirements=requirements,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# PDF Styles Cache
|
||||
# =============================================================================
|
||||
@@ -869,9 +913,18 @@ class BaseComplianceReportGenerator(ABC):
|
||||
prowler_provider = initialize_prowler_provider(provider_obj)
|
||||
provider_type = provider_obj.provider
|
||||
|
||||
# Load compliance framework
|
||||
frameworks_bulk = Compliance.get_bulk(provider_type)
|
||||
compliance_obj = frameworks_bulk.get(compliance_id)
|
||||
# Load compliance framework — fall back to the universal loader
|
||||
# for top-level JSONs (e.g. csa_ccm_4.0) that Compliance.get_bulk
|
||||
# does not scan.
|
||||
compliance_obj = Compliance.get_bulk(provider_type).get(compliance_id)
|
||||
if not compliance_obj:
|
||||
universal_framework = get_bulk_compliance_frameworks_universal(
|
||||
provider_type
|
||||
).get(compliance_id)
|
||||
if universal_framework:
|
||||
compliance_obj = _adapt_universal_to_legacy(
|
||||
universal_framework, provider_type
|
||||
)
|
||||
|
||||
if not compliance_obj:
|
||||
raise ValueError(f"Compliance framework not found: {compliance_id}")
|
||||
|
||||
@@ -359,35 +359,40 @@ def _load_findings_for_requirement_checks(
|
||||
def _get_compliance_check_ids(compliance_obj) -> set[str]:
|
||||
"""Return the union of all check_ids referenced by a compliance framework.
|
||||
|
||||
Used by the master report orchestrator to know which checks each
|
||||
framework consumes from the shared ``findings_cache``, so that once a
|
||||
framework finishes the entries no other pending framework needs can be
|
||||
evicted from the cache (PROWLER-1733).
|
||||
Used by the master report orchestrator to evict entries from
|
||||
``findings_cache`` once no pending framework needs them (PROWLER-1733).
|
||||
|
||||
Args:
|
||||
compliance_obj: A loaded Compliance framework object exposing a
|
||||
``Requirements`` iterable, each requirement carrying ``Checks``.
|
||||
``None`` is treated as "no checks" rather than raising, so the
|
||||
caller can pass ``frameworks_bulk.get(...)`` directly without
|
||||
an extra existence check.
|
||||
|
||||
Returns:
|
||||
Set of check_id strings (empty if ``compliance_obj`` is ``None``).
|
||||
Accepts the legacy ``Compliance`` shape (``Requirements`` / ``Checks``
|
||||
lists) and the universal ``ComplianceFramework`` shape (``requirements``
|
||||
/ ``checks`` dict keyed by provider). ``None`` returns an empty set so
|
||||
callers can pass ``frameworks_bulk.get(...)`` directly.
|
||||
"""
|
||||
if compliance_obj is None:
|
||||
return set()
|
||||
checks: set[str] = set()
|
||||
requirements = getattr(compliance_obj, "Requirements", None) or []
|
||||
|
||||
requirements = getattr(compliance_obj, "Requirements", None) or getattr(
|
||||
compliance_obj, "requirements", None
|
||||
)
|
||||
if not requirements:
|
||||
return set()
|
||||
|
||||
check_ids: set[str] = set()
|
||||
try:
|
||||
# Defensive: Mock objects (used in unit tests) return another Mock
|
||||
# for any attribute access, which is truthy but not iterable. Treat
|
||||
# any non-iterable Requirements value as "no checks".
|
||||
for req in requirements:
|
||||
req_checks = getattr(req, "Checks", None) or []
|
||||
# Mock objects in unit tests return another Mock for any attribute
|
||||
# access — truthy but not iterable. Treat that as "no checks".
|
||||
for requirement in requirements:
|
||||
requirement_checks = getattr(requirement, "Checks", None)
|
||||
if requirement_checks is None:
|
||||
checks_by_provider = getattr(requirement, "checks", None) or {}
|
||||
requirement_checks = [
|
||||
check_id
|
||||
for check_ids_list in checks_by_provider.values()
|
||||
for check_id in check_ids_list
|
||||
]
|
||||
try:
|
||||
checks.update(req_checks)
|
||||
check_ids.update(requirement_checks)
|
||||
except TypeError:
|
||||
continue
|
||||
except TypeError:
|
||||
return set()
|
||||
return checks
|
||||
return check_ids
|
||||
|
||||
@@ -68,7 +68,10 @@ from tasks.utils import (
|
||||
get_next_execution_datetime,
|
||||
)
|
||||
|
||||
from api.compliance import get_compliance_frameworks
|
||||
from api.compliance import (
|
||||
get_compliance_frameworks,
|
||||
get_prowler_provider_compliance,
|
||||
)
|
||||
from api.db_router import READ_REPLICA_ALIAS
|
||||
from api.db_utils import delete_related_daily_task, rls_transaction
|
||||
from api.decorators import handle_provider_deletion, set_tenant
|
||||
@@ -76,6 +79,9 @@ from api.models import Finding, Integration, Provider, Scan, ScanSummary, StateC
|
||||
from api.utils import initialize_prowler_provider
|
||||
from api.v1.serializers import ScanTaskSerializer
|
||||
from prowler.lib.check.compliance_models import Compliance
|
||||
from prowler.lib.outputs.compliance.compliance import (
|
||||
process_universal_compliance_frameworks,
|
||||
)
|
||||
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
|
||||
from prowler.lib.outputs.finding import Finding as FindingOutput
|
||||
|
||||
@@ -543,7 +549,16 @@ def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str):
|
||||
provider_uid = provider_obj.uid
|
||||
provider_type = provider_obj.provider
|
||||
|
||||
# Per-framework exporters in `COMPLIANCE_CLASS_MAP` consume the legacy bulk.
|
||||
frameworks_bulk = Compliance.get_bulk(provider_type)
|
||||
# Universal-only frameworks (top-level JSONs like `dora.json`) are emitted
|
||||
# via `process_universal_compliance_frameworks` below.
|
||||
universal_bulk = get_prowler_provider_compliance(provider_type)
|
||||
universal_only_names = {
|
||||
name
|
||||
for name in universal_bulk
|
||||
if name not in frameworks_bulk and universal_bulk[name].outputs
|
||||
}
|
||||
frameworks_avail = get_compliance_frameworks(provider_type)
|
||||
out_dir, comp_dir = _generate_output_directory(
|
||||
DJANGO_TMP_OUTPUT_DIRECTORY, provider_uid, tenant_id, scan_id
|
||||
@@ -568,6 +583,10 @@ def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str):
|
||||
|
||||
output_writers = {}
|
||||
compliance_writers = {}
|
||||
# Shared across batches so universal writers are created once and reused.
|
||||
universal_compliance_state: dict[str, list] = {"compliance": []}
|
||||
universal_base_dir = os.path.dirname(out_dir)
|
||||
universal_output_filename = os.path.basename(out_dir)
|
||||
|
||||
scan_summary = FindingOutput._transform_findings_stats(
|
||||
ScanSummary.objects.filter(scan_id=scan_id)
|
||||
@@ -622,8 +641,30 @@ def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str):
|
||||
writer.batch_write_data_to_file(**extra)
|
||||
writer._data.clear()
|
||||
|
||||
# Compliance CSVs
|
||||
# Universal-only frameworks (e.g. `dora.json`).
|
||||
if universal_only_names:
|
||||
process_universal_compliance_frameworks(
|
||||
input_compliance_frameworks=universal_only_names,
|
||||
universal_frameworks=universal_bulk,
|
||||
finding_outputs=fos,
|
||||
output_directory=universal_base_dir,
|
||||
output_filename=universal_output_filename,
|
||||
provider=provider_type,
|
||||
generated_outputs=universal_compliance_state,
|
||||
from_cli=False,
|
||||
is_last=is_last,
|
||||
)
|
||||
|
||||
# Compliance CSVs (per-framework exporters).
|
||||
for name in frameworks_avail:
|
||||
if name in universal_only_names:
|
||||
continue
|
||||
if name not in frameworks_bulk:
|
||||
logger.warning(
|
||||
"Compliance framework '%s' missing from bulk; skipping CSV export",
|
||||
name,
|
||||
)
|
||||
continue
|
||||
compliance_obj = frameworks_bulk[name]
|
||||
|
||||
klass = GenericCompliance
|
||||
|
||||
@@ -80,7 +80,7 @@ def basic_csa_compliance_data():
|
||||
tenant_id="tenant-123",
|
||||
scan_id="scan-456",
|
||||
provider_id="provider-789",
|
||||
compliance_id="csa_ccm_4.0_aws",
|
||||
compliance_id="csa_ccm_4.0",
|
||||
framework="CSA-CCM",
|
||||
name="CSA Cloud Controls Matrix v4.0",
|
||||
version="4.0",
|
||||
|
||||
@@ -323,6 +323,7 @@ class TestGenerateOutputs:
|
||||
|
||||
mock_transformed_stats = {"some": "stats"}
|
||||
with (
|
||||
patch("tasks.tasks.get_prowler_provider_compliance", return_value={}),
|
||||
patch(
|
||||
"tasks.tasks.FindingOutput._transform_findings_stats",
|
||||
return_value=mock_transformed_stats,
|
||||
@@ -441,6 +442,7 @@ class TestGenerateOutputs:
|
||||
mock_provider.uid = "test-provider-uid"
|
||||
|
||||
with (
|
||||
patch("tasks.tasks.get_prowler_provider_compliance", return_value={}),
|
||||
patch("tasks.tasks.ScanSummary.objects.filter") as mock_filter,
|
||||
patch("tasks.tasks.Provider.objects.get", return_value=mock_provider),
|
||||
patch("tasks.tasks.initialize_prowler_provider"),
|
||||
@@ -596,6 +598,7 @@ class TestGenerateOutputs:
|
||||
]
|
||||
|
||||
with (
|
||||
patch("tasks.tasks.get_prowler_provider_compliance", return_value={}),
|
||||
patch("tasks.tasks.ScanSummary.objects.filter") as mock_summary,
|
||||
patch(
|
||||
"tasks.tasks.Provider.objects.get",
|
||||
@@ -670,6 +673,7 @@ class TestGenerateOutputs:
|
||||
mock_provider.uid = "test-provider-uid"
|
||||
|
||||
with (
|
||||
patch("tasks.tasks.get_prowler_provider_compliance", return_value={}),
|
||||
patch("tasks.tasks.ScanSummary.objects.filter") as mock_filter,
|
||||
patch("tasks.tasks.Provider.objects.get", return_value=mock_provider),
|
||||
patch("tasks.tasks.initialize_prowler_provider"),
|
||||
@@ -1113,6 +1117,7 @@ class TestCheckIntegrationsTask:
|
||||
enabled=True,
|
||||
)
|
||||
|
||||
@patch("tasks.tasks.get_prowler_provider_compliance", return_value={})
|
||||
@patch("tasks.tasks.s3_integration_task")
|
||||
@patch("tasks.tasks.Integration.objects.filter")
|
||||
@patch("tasks.tasks.ScanSummary.objects.filter")
|
||||
@@ -1145,6 +1150,7 @@ class TestCheckIntegrationsTask:
|
||||
mock_scan_summary,
|
||||
mock_integration_filter,
|
||||
mock_s3_task,
|
||||
mock_get_prowler_compliance,
|
||||
):
|
||||
"""Test that ASFF output is generated for AWS providers with SecurityHub integration."""
|
||||
# Setup
|
||||
@@ -1241,6 +1247,7 @@ class TestCheckIntegrationsTask:
|
||||
|
||||
assert result == {"upload": True}
|
||||
|
||||
@patch("tasks.tasks.get_prowler_provider_compliance", return_value={})
|
||||
@patch("tasks.tasks.s3_integration_task")
|
||||
@patch("tasks.tasks.Integration.objects.filter")
|
||||
@patch("tasks.tasks.ScanSummary.objects.filter")
|
||||
@@ -1273,6 +1280,7 @@ class TestCheckIntegrationsTask:
|
||||
mock_scan_summary,
|
||||
mock_integration_filter,
|
||||
mock_s3_task,
|
||||
mock_get_prowler_compliance,
|
||||
):
|
||||
"""Test that ASFF output is NOT generated for AWS providers without SecurityHub integration."""
|
||||
# Setup
|
||||
@@ -1366,6 +1374,7 @@ class TestCheckIntegrationsTask:
|
||||
|
||||
assert result == {"upload": True}
|
||||
|
||||
@patch("tasks.tasks.get_prowler_provider_compliance", return_value={})
|
||||
@patch("tasks.tasks.ScanSummary.objects.filter")
|
||||
@patch("tasks.tasks.Provider.objects.get")
|
||||
@patch("tasks.tasks.initialize_prowler_provider")
|
||||
@@ -1394,6 +1403,7 @@ class TestCheckIntegrationsTask:
|
||||
mock_initialize_provider,
|
||||
mock_provider_get,
|
||||
mock_scan_summary,
|
||||
mock_get_prowler_compliance,
|
||||
):
|
||||
"""Test that ASFF output is NOT generated for non-AWS providers (e.g., Azure, GCP)."""
|
||||
# Setup
|
||||
|
||||
Reference in New Issue
Block a user