From 206feeb5a8aedbe5d6ffbaa97654ebff6950b858 Mon Sep 17 00:00:00 2001 From: Daniel Barranquero Date: Fri, 20 Mar 2026 09:35:23 +0100 Subject: [PATCH] fix: accept legacy metadata from db --- api/src/backend/tasks/tests/test_tasks.py | 166 ++++++++++++++++++++++ prowler/lib/outputs/finding.py | 99 ++++++++----- tests/lib/outputs/finding_test.py | 89 ++++++++++++ 3 files changed, 318 insertions(+), 36 deletions(-) diff --git a/api/src/backend/tasks/tests/test_tasks.py b/api/src/backend/tasks/tests/test_tasks.py index 3a58118c62..284065e9aa 100644 --- a/api/src/backend/tasks/tests/test_tasks.py +++ b/api/src/backend/tasks/tests/test_tasks.py @@ -334,6 +334,172 @@ class TestGenerateOutputs: output_location="s3://bucket/zipped.zip" ) + @patch("tasks.tasks._upload_to_s3") + @patch("tasks.tasks._compress_output_files") + @patch("tasks.tasks.get_compliance_frameworks") + @patch("tasks.tasks.Compliance.get_bulk") + @patch("tasks.tasks.initialize_prowler_provider") + @patch("tasks.tasks.Provider.objects.get") + @patch("tasks.tasks.ScanSummary.objects.filter") + @patch("tasks.tasks.Finding.all_objects.filter") + def test_generate_outputs_accepts_legacy_persisted_check_metadata( + self, + mock_finding_filter, + mock_scan_summary_filter, + mock_provider_get, + mock_initialize_provider, + mock_compliance_get_bulk, + mock_get_available_frameworks, + mock_compress, + mock_upload, + ): + mock_scan_summary_filter.return_value.exists.return_value = True + + mock_provider = MagicMock() + mock_provider.uid = "azure-subscription-123" + mock_provider.provider = "azure" + mock_provider_get.return_value = mock_provider + + prowler_provider = MagicMock() + prowler_provider.type = "azure" + prowler_provider.identity.identity_type = "mock_identity_type" + prowler_provider.identity.identity_id = "mock_identity_id" + prowler_provider.identity.subscriptions = { + "legacy-subscription": "legacy-sub-id" + } + prowler_provider.identity.tenant_ids = ["test-ing-432a-a828-d9c965196f87"] + prowler_provider.identity.tenant_domain = "mock_tenant_domain" + prowler_provider.region_config.name = "AzureCloud" + mock_initialize_provider.return_value = prowler_provider + + mock_compliance_get_bulk.return_value = {} + mock_get_available_frameworks.return_value = [] + + resource = MagicMock() + resource.uid = ( + "/subscriptions/legacy-sub-id/providers/Microsoft.Authorization/" + "policyAssignments/legacy" + ) + resource.name = "legacy-policy" + resource.region = "global" + resource.metadata = "{}" + resource.details = "" + resource.tags.all.return_value = [MagicMock(key="env", value="prod")] + + dummy_finding = MagicMock() + dummy_finding.uid = "finding-uid-legacy" + dummy_finding.status = "FAIL" + dummy_finding.status_extended = "Legacy metadata finding" + dummy_finding.muted = False + dummy_finding.compliance = {} + dummy_finding.raw_result = {} + dummy_finding.check_id = ( + "entra_conditional_access_policy_require_mfa_for_management_api" + ) + dummy_finding.check_metadata = { + "provider": "azure", + "checkid": "entra_conditional_access_policy_require_mfa_for_management_api", + "checktitle": "Ensure Multifactor Authentication is Required for Windows Azure Service Management API", + "checktype": [], + "servicename": "entra", + "subservicename": "", + "severity": "medium", + "resourcetype": "#microsoft.graph.conditionalAccess", + "resourcegroup": "IAM", + "description": "Legacy description", + "risk": "Legacy risk", + "relatedurl": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management", + "additionalurls": [ + "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps" + ], + "remediation": { + "code": { + "cli": "", + "other": "", + "nativeiac": "", + "terraform": "", + }, + "recommendation": { + "text": "Legacy remediation", + "url": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps", + }, + }, + "resourceidtemplate": "", + "categories": [], + "dependson": [], + "relatedto": [], + "notes": "Legacy notes", + } + dummy_finding.resources.first.return_value = resource + + mock_finding_filter.return_value.order_by.return_value.iterator.return_value = [ + dummy_finding + ] + + writer_instances = [] + + def writer_factory(*args, **kwargs): + writer = MagicMock() + writer._data = [] + writer.transform = MagicMock() + writer.batch_write_data_to_file = MagicMock() + writer.findings = kwargs["findings"] + writer_instances.append(writer) + return writer + + with ( + patch( + "tasks.tasks.FindingOutput._transform_findings_stats", + return_value={"some": "stats"}, + ), + patch( + "tasks.tasks.OUTPUT_FORMATS_MAPPING", + { + "json": { + "class": writer_factory, + "suffix": ".json", + "kwargs": {}, + } + }, + ), + patch( + "tasks.tasks._generate_output_directory", + return_value=( + "/tmp/test/out-dir", + "/tmp/test/comp-dir", + ), + ), + patch("tasks.tasks.Scan.all_objects.filter") as mock_scan_update, + patch("tasks.tasks.rmtree"), + ): + mock_compress.return_value = "/tmp/zipped.zip" + mock_upload.return_value = "s3://bucket/zipped.zip" + + result = generate_outputs_task( + scan_id=self.scan_id, + provider_id=self.provider_id, + tenant_id=self.tenant_id, + ) + + assert result == {"upload": True} + assert len(writer_instances) == 1 + + transformed_finding = writer_instances[0].findings[0] + assert transformed_finding.metadata.CheckTitle.startswith("Ensure") + assert ( + transformed_finding.metadata.RelatedUrl + == "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management" + ) + assert ( + transformed_finding.metadata.Remediation.Recommendation.Url + == "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps" + ) + assert transformed_finding.metadata.Severity.value == "medium" + + mock_scan_update.return_value.update.assert_called_once_with( + output_location="s3://bucket/zipped.zip" + ) + def test_generate_outputs_fails_upload(self): with ( patch("tasks.tasks.ScanSummary.objects.filter") as mock_filter, diff --git a/prowler/lib/outputs/finding.py b/prowler/lib/outputs/finding.py index a93d20f063..c5457ba6d0 100644 --- a/prowler/lib/outputs/finding.py +++ b/prowler/lib/outputs/finding.py @@ -12,6 +12,7 @@ from prowler.lib.check.models import ( Code, Recommendation, Remediation, + Severity, ) from prowler.lib.logger import logger from prowler.lib.outputs.common import Status, fill_common_finding_data @@ -536,48 +537,74 @@ class Finding(BaseModel): finding.zone_name = getattr(resource, "zone_name", resource.name) finding.account_id = getattr(finding, "account_id", "") - finding.check_metadata = CheckMetadata( - Provider=finding.check_metadata["provider"], - CheckID=finding.check_metadata["checkid"], - CheckTitle=finding.check_metadata["checktitle"], - CheckType=finding.check_metadata["checktype"], - ServiceName=finding.check_metadata["servicename"], - SubServiceName=finding.check_metadata["subservicename"], - Severity=finding.check_metadata["severity"], - ResourceType=finding.check_metadata["resourcetype"], - Description=finding.check_metadata["description"], - Risk=finding.check_metadata["risk"], - RelatedUrl=finding.check_metadata["relatedurl"], - Remediation=Remediation( - Recommendation=Recommendation( - Text=finding.check_metadata["remediation"]["recommendation"][ - "text" - ], - Url=finding.check_metadata["remediation"]["recommendation"]["url"], - ), - Code=Code( - NativeIaC=finding.check_metadata["remediation"]["code"][ - "nativeiac" - ], - Terraform=finding.check_metadata["remediation"]["code"][ - "terraform" - ], - CLI=finding.check_metadata["remediation"]["code"]["cli"], - Other=finding.check_metadata["remediation"]["code"]["other"], - ), - ), - ResourceIdTemplate=finding.check_metadata["resourceidtemplate"], - Categories=finding.check_metadata["categories"], - DependsOn=finding.check_metadata["dependson"], - RelatedTo=finding.check_metadata["relatedto"], - Notes=finding.check_metadata["notes"], - ) + metadata_kwargs = cls._get_api_check_metadata_kwargs(finding.check_metadata) + try: + finding.check_metadata = CheckMetadata(**metadata_kwargs) + except ValidationError as validation_error: + check_id = metadata_kwargs.get("CheckID", getattr(finding, "check_id", "")) + logger.warning( + "Legacy persisted check metadata failed validation during API finding transformation " + f"for {check_id}. Falling back to compatibility mode. Errors: {validation_error.errors()}" + ) + finding.check_metadata = cls._construct_legacy_check_metadata( + metadata_kwargs + ) finding.resource_tags = unroll_tags( [{"key": tag.key, "value": tag.value} for tag in resource.tags.all()] ) return cls.generate_output(provider, finding, SimpleNamespace()) + @staticmethod + def _get_api_check_metadata_kwargs(check_metadata: dict) -> dict: + remediation = check_metadata["remediation"] + remediation_code = remediation["code"] + remediation_recommendation = remediation["recommendation"] + + return { + "Provider": check_metadata["provider"], + "CheckID": check_metadata["checkid"], + "CheckTitle": check_metadata["checktitle"], + "CheckType": check_metadata["checktype"], + "CheckAliases": check_metadata.get("checkaliases", []), + "ServiceName": check_metadata["servicename"], + "SubServiceName": check_metadata["subservicename"], + "Severity": check_metadata["severity"], + "ResourceType": check_metadata["resourcetype"], + "ResourceGroup": check_metadata.get("resourcegroup", ""), + "Description": check_metadata["description"], + "Risk": check_metadata["risk"], + "RelatedUrl": check_metadata["relatedurl"], + "Remediation": Remediation( + Recommendation=Recommendation( + Text=remediation_recommendation["text"], + Url=remediation_recommendation["url"], + ), + Code=Code( + NativeIaC=remediation_code["nativeiac"], + Terraform=remediation_code["terraform"], + CLI=remediation_code["cli"], + Other=remediation_code["other"], + ), + ), + "ResourceIdTemplate": check_metadata["resourceidtemplate"], + "AdditionalURLs": check_metadata.get("additionalurls", []), + "Categories": check_metadata["categories"], + "DependsOn": check_metadata["dependson"], + "RelatedTo": check_metadata["relatedto"], + "Notes": check_metadata["notes"], + "Compliance": check_metadata.get("compliance", []), + } + + @staticmethod + def _construct_legacy_check_metadata(metadata_kwargs: dict) -> CheckMetadata: + severity = metadata_kwargs["Severity"] + if not isinstance(severity, Severity): + severity = Severity(severity) + + legacy_metadata_kwargs = {**metadata_kwargs, "Severity": severity} + return CheckMetadata.construct(**legacy_metadata_kwargs) + def _transform_findings_stats(scan_summaries: list[dict]) -> dict: """ Aggregate and transform scan summary data into findings statistics. diff --git a/tests/lib/outputs/finding_test.py b/tests/lib/outputs/finding_test.py index 00ce2fe1f7..4b7c5b6db3 100644 --- a/tests/lib/outputs/finding_test.py +++ b/tests/lib/outputs/finding_test.py @@ -1126,6 +1126,95 @@ class TestFinding: for segment in expected_segments: assert segment in finding_obj.uid + @patch( + "prowler.lib.outputs.finding.get_check_compliance", + new=mock_get_check_compliance, + ) + def test_transform_api_finding_azure_accepts_legacy_persisted_metadata(self): + provider = MagicMock() + provider.type = "azure" + provider.identity.identity_type = "mock_identity_type" + provider.identity.identity_id = "mock_identity_id" + provider.identity.subscriptions = {"legacy-subscription": "legacy-sub-id"} + provider.identity.tenant_ids = ["test-ing-432a-a828-d9c965196f87"] + provider.identity.tenant_domain = "mock_tenant_domain" + provider.region_config.name = "AzureCloud" + + api_finding = DummyAPIFinding() + api_finding.uid = "finding-uid-legacy" + api_finding.status = "FAIL" + api_finding.status_extended = "Legacy metadata finding" + api_finding.check_id = ( + "entra_conditional_access_policy_require_mfa_for_management_api" + ) + api_finding.check_metadata = { + "provider": "azure", + "checkid": "entra_conditional_access_policy_require_mfa_for_management_api", + "checktitle": "Ensure Multifactor Authentication is Required for Windows Azure Service Management API", + "checktype": [], + "servicename": "entra", + "subservicename": "", + "severity": "medium", + "resourcetype": "#microsoft.graph.conditionalAccess", + "resourcegroup": "IAM", + "description": "Legacy description", + "risk": "Legacy risk", + "relatedurl": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management", + "additionalurls": [ + "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps" + ], + "remediation": { + "code": { + "cli": "", + "other": "", + "nativeiac": "", + "terraform": "", + }, + "recommendation": { + "text": "Legacy remediation", + "url": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps", + }, + }, + "resourceidtemplate": "", + "categories": [], + "dependson": [], + "relatedto": [], + "notes": "Legacy notes", + } + api_finding.muted = False + api_resource = DummyResource( + uid="/subscriptions/legacy-sub-id/providers/Microsoft.Authorization/policyAssignments/legacy", + name="legacy-policy", + resource_arn="arn", + region="global", + tags=[], + ) + api_finding.resources = DummyResources(api_resource) + + finding_obj = Finding.transform_api_finding(api_finding, provider) + + assert finding_obj.account_uid == "legacy-sub-id" + assert finding_obj.resource_uid == api_resource.uid + assert finding_obj.resource_name == api_resource.name + + meta = finding_obj.metadata + assert ( + meta.CheckTitle + == "Ensure Multifactor Authentication is Required for Windows Azure Service Management API" + ) + assert ( + meta.RelatedUrl + == "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management" + ) + assert ( + meta.Remediation.Recommendation.Url + == "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps" + ) + assert meta.ResourceGroup == "IAM" + assert meta.AdditionalURLs == [ + "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps" + ] + @patch( "prowler.lib.outputs.finding.get_check_compliance", new=mock_get_check_compliance,