mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-22 03:08:23 +00:00
Compare commits
1 Commits
f8bededc9b
...
fix/legacy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
206feeb5a8 |
@@ -334,6 +334,172 @@ class TestGenerateOutputs:
|
||||
output_location="s3://bucket/zipped.zip"
|
||||
)
|
||||
|
||||
@patch("tasks.tasks._upload_to_s3")
|
||||
@patch("tasks.tasks._compress_output_files")
|
||||
@patch("tasks.tasks.get_compliance_frameworks")
|
||||
@patch("tasks.tasks.Compliance.get_bulk")
|
||||
@patch("tasks.tasks.initialize_prowler_provider")
|
||||
@patch("tasks.tasks.Provider.objects.get")
|
||||
@patch("tasks.tasks.ScanSummary.objects.filter")
|
||||
@patch("tasks.tasks.Finding.all_objects.filter")
|
||||
def test_generate_outputs_accepts_legacy_persisted_check_metadata(
|
||||
self,
|
||||
mock_finding_filter,
|
||||
mock_scan_summary_filter,
|
||||
mock_provider_get,
|
||||
mock_initialize_provider,
|
||||
mock_compliance_get_bulk,
|
||||
mock_get_available_frameworks,
|
||||
mock_compress,
|
||||
mock_upload,
|
||||
):
|
||||
mock_scan_summary_filter.return_value.exists.return_value = True
|
||||
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.uid = "azure-subscription-123"
|
||||
mock_provider.provider = "azure"
|
||||
mock_provider_get.return_value = mock_provider
|
||||
|
||||
prowler_provider = MagicMock()
|
||||
prowler_provider.type = "azure"
|
||||
prowler_provider.identity.identity_type = "mock_identity_type"
|
||||
prowler_provider.identity.identity_id = "mock_identity_id"
|
||||
prowler_provider.identity.subscriptions = {
|
||||
"legacy-subscription": "legacy-sub-id"
|
||||
}
|
||||
prowler_provider.identity.tenant_ids = ["test-ing-432a-a828-d9c965196f87"]
|
||||
prowler_provider.identity.tenant_domain = "mock_tenant_domain"
|
||||
prowler_provider.region_config.name = "AzureCloud"
|
||||
mock_initialize_provider.return_value = prowler_provider
|
||||
|
||||
mock_compliance_get_bulk.return_value = {}
|
||||
mock_get_available_frameworks.return_value = []
|
||||
|
||||
resource = MagicMock()
|
||||
resource.uid = (
|
||||
"/subscriptions/legacy-sub-id/providers/Microsoft.Authorization/"
|
||||
"policyAssignments/legacy"
|
||||
)
|
||||
resource.name = "legacy-policy"
|
||||
resource.region = "global"
|
||||
resource.metadata = "{}"
|
||||
resource.details = ""
|
||||
resource.tags.all.return_value = [MagicMock(key="env", value="prod")]
|
||||
|
||||
dummy_finding = MagicMock()
|
||||
dummy_finding.uid = "finding-uid-legacy"
|
||||
dummy_finding.status = "FAIL"
|
||||
dummy_finding.status_extended = "Legacy metadata finding"
|
||||
dummy_finding.muted = False
|
||||
dummy_finding.compliance = {}
|
||||
dummy_finding.raw_result = {}
|
||||
dummy_finding.check_id = (
|
||||
"entra_conditional_access_policy_require_mfa_for_management_api"
|
||||
)
|
||||
dummy_finding.check_metadata = {
|
||||
"provider": "azure",
|
||||
"checkid": "entra_conditional_access_policy_require_mfa_for_management_api",
|
||||
"checktitle": "Ensure Multifactor Authentication is Required for Windows Azure Service Management API",
|
||||
"checktype": [],
|
||||
"servicename": "entra",
|
||||
"subservicename": "",
|
||||
"severity": "medium",
|
||||
"resourcetype": "#microsoft.graph.conditionalAccess",
|
||||
"resourcegroup": "IAM",
|
||||
"description": "Legacy description",
|
||||
"risk": "Legacy risk",
|
||||
"relatedurl": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management",
|
||||
"additionalurls": [
|
||||
"https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
|
||||
],
|
||||
"remediation": {
|
||||
"code": {
|
||||
"cli": "",
|
||||
"other": "",
|
||||
"nativeiac": "",
|
||||
"terraform": "",
|
||||
},
|
||||
"recommendation": {
|
||||
"text": "Legacy remediation",
|
||||
"url": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps",
|
||||
},
|
||||
},
|
||||
"resourceidtemplate": "",
|
||||
"categories": [],
|
||||
"dependson": [],
|
||||
"relatedto": [],
|
||||
"notes": "Legacy notes",
|
||||
}
|
||||
dummy_finding.resources.first.return_value = resource
|
||||
|
||||
mock_finding_filter.return_value.order_by.return_value.iterator.return_value = [
|
||||
dummy_finding
|
||||
]
|
||||
|
||||
writer_instances = []
|
||||
|
||||
def writer_factory(*args, **kwargs):
|
||||
writer = MagicMock()
|
||||
writer._data = []
|
||||
writer.transform = MagicMock()
|
||||
writer.batch_write_data_to_file = MagicMock()
|
||||
writer.findings = kwargs["findings"]
|
||||
writer_instances.append(writer)
|
||||
return writer
|
||||
|
||||
with (
|
||||
patch(
|
||||
"tasks.tasks.FindingOutput._transform_findings_stats",
|
||||
return_value={"some": "stats"},
|
||||
),
|
||||
patch(
|
||||
"tasks.tasks.OUTPUT_FORMATS_MAPPING",
|
||||
{
|
||||
"json": {
|
||||
"class": writer_factory,
|
||||
"suffix": ".json",
|
||||
"kwargs": {},
|
||||
}
|
||||
},
|
||||
),
|
||||
patch(
|
||||
"tasks.tasks._generate_output_directory",
|
||||
return_value=(
|
||||
"/tmp/test/out-dir",
|
||||
"/tmp/test/comp-dir",
|
||||
),
|
||||
),
|
||||
patch("tasks.tasks.Scan.all_objects.filter") as mock_scan_update,
|
||||
patch("tasks.tasks.rmtree"),
|
||||
):
|
||||
mock_compress.return_value = "/tmp/zipped.zip"
|
||||
mock_upload.return_value = "s3://bucket/zipped.zip"
|
||||
|
||||
result = generate_outputs_task(
|
||||
scan_id=self.scan_id,
|
||||
provider_id=self.provider_id,
|
||||
tenant_id=self.tenant_id,
|
||||
)
|
||||
|
||||
assert result == {"upload": True}
|
||||
assert len(writer_instances) == 1
|
||||
|
||||
transformed_finding = writer_instances[0].findings[0]
|
||||
assert transformed_finding.metadata.CheckTitle.startswith("Ensure")
|
||||
assert (
|
||||
transformed_finding.metadata.RelatedUrl
|
||||
== "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management"
|
||||
)
|
||||
assert (
|
||||
transformed_finding.metadata.Remediation.Recommendation.Url
|
||||
== "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
|
||||
)
|
||||
assert transformed_finding.metadata.Severity.value == "medium"
|
||||
|
||||
mock_scan_update.return_value.update.assert_called_once_with(
|
||||
output_location="s3://bucket/zipped.zip"
|
||||
)
|
||||
|
||||
def test_generate_outputs_fails_upload(self):
|
||||
with (
|
||||
patch("tasks.tasks.ScanSummary.objects.filter") as mock_filter,
|
||||
|
||||
@@ -12,6 +12,7 @@ from prowler.lib.check.models import (
|
||||
Code,
|
||||
Recommendation,
|
||||
Remediation,
|
||||
Severity,
|
||||
)
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.outputs.common import Status, fill_common_finding_data
|
||||
@@ -536,48 +537,74 @@ class Finding(BaseModel):
|
||||
finding.zone_name = getattr(resource, "zone_name", resource.name)
|
||||
finding.account_id = getattr(finding, "account_id", "")
|
||||
|
||||
finding.check_metadata = CheckMetadata(
|
||||
Provider=finding.check_metadata["provider"],
|
||||
CheckID=finding.check_metadata["checkid"],
|
||||
CheckTitle=finding.check_metadata["checktitle"],
|
||||
CheckType=finding.check_metadata["checktype"],
|
||||
ServiceName=finding.check_metadata["servicename"],
|
||||
SubServiceName=finding.check_metadata["subservicename"],
|
||||
Severity=finding.check_metadata["severity"],
|
||||
ResourceType=finding.check_metadata["resourcetype"],
|
||||
Description=finding.check_metadata["description"],
|
||||
Risk=finding.check_metadata["risk"],
|
||||
RelatedUrl=finding.check_metadata["relatedurl"],
|
||||
Remediation=Remediation(
|
||||
Recommendation=Recommendation(
|
||||
Text=finding.check_metadata["remediation"]["recommendation"][
|
||||
"text"
|
||||
],
|
||||
Url=finding.check_metadata["remediation"]["recommendation"]["url"],
|
||||
),
|
||||
Code=Code(
|
||||
NativeIaC=finding.check_metadata["remediation"]["code"][
|
||||
"nativeiac"
|
||||
],
|
||||
Terraform=finding.check_metadata["remediation"]["code"][
|
||||
"terraform"
|
||||
],
|
||||
CLI=finding.check_metadata["remediation"]["code"]["cli"],
|
||||
Other=finding.check_metadata["remediation"]["code"]["other"],
|
||||
),
|
||||
),
|
||||
ResourceIdTemplate=finding.check_metadata["resourceidtemplate"],
|
||||
Categories=finding.check_metadata["categories"],
|
||||
DependsOn=finding.check_metadata["dependson"],
|
||||
RelatedTo=finding.check_metadata["relatedto"],
|
||||
Notes=finding.check_metadata["notes"],
|
||||
)
|
||||
metadata_kwargs = cls._get_api_check_metadata_kwargs(finding.check_metadata)
|
||||
try:
|
||||
finding.check_metadata = CheckMetadata(**metadata_kwargs)
|
||||
except ValidationError as validation_error:
|
||||
check_id = metadata_kwargs.get("CheckID", getattr(finding, "check_id", ""))
|
||||
logger.warning(
|
||||
"Legacy persisted check metadata failed validation during API finding transformation "
|
||||
f"for {check_id}. Falling back to compatibility mode. Errors: {validation_error.errors()}"
|
||||
)
|
||||
finding.check_metadata = cls._construct_legacy_check_metadata(
|
||||
metadata_kwargs
|
||||
)
|
||||
finding.resource_tags = unroll_tags(
|
||||
[{"key": tag.key, "value": tag.value} for tag in resource.tags.all()]
|
||||
)
|
||||
|
||||
return cls.generate_output(provider, finding, SimpleNamespace())
|
||||
|
||||
@staticmethod
|
||||
def _get_api_check_metadata_kwargs(check_metadata: dict) -> dict:
|
||||
remediation = check_metadata["remediation"]
|
||||
remediation_code = remediation["code"]
|
||||
remediation_recommendation = remediation["recommendation"]
|
||||
|
||||
return {
|
||||
"Provider": check_metadata["provider"],
|
||||
"CheckID": check_metadata["checkid"],
|
||||
"CheckTitle": check_metadata["checktitle"],
|
||||
"CheckType": check_metadata["checktype"],
|
||||
"CheckAliases": check_metadata.get("checkaliases", []),
|
||||
"ServiceName": check_metadata["servicename"],
|
||||
"SubServiceName": check_metadata["subservicename"],
|
||||
"Severity": check_metadata["severity"],
|
||||
"ResourceType": check_metadata["resourcetype"],
|
||||
"ResourceGroup": check_metadata.get("resourcegroup", ""),
|
||||
"Description": check_metadata["description"],
|
||||
"Risk": check_metadata["risk"],
|
||||
"RelatedUrl": check_metadata["relatedurl"],
|
||||
"Remediation": Remediation(
|
||||
Recommendation=Recommendation(
|
||||
Text=remediation_recommendation["text"],
|
||||
Url=remediation_recommendation["url"],
|
||||
),
|
||||
Code=Code(
|
||||
NativeIaC=remediation_code["nativeiac"],
|
||||
Terraform=remediation_code["terraform"],
|
||||
CLI=remediation_code["cli"],
|
||||
Other=remediation_code["other"],
|
||||
),
|
||||
),
|
||||
"ResourceIdTemplate": check_metadata["resourceidtemplate"],
|
||||
"AdditionalURLs": check_metadata.get("additionalurls", []),
|
||||
"Categories": check_metadata["categories"],
|
||||
"DependsOn": check_metadata["dependson"],
|
||||
"RelatedTo": check_metadata["relatedto"],
|
||||
"Notes": check_metadata["notes"],
|
||||
"Compliance": check_metadata.get("compliance", []),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _construct_legacy_check_metadata(metadata_kwargs: dict) -> CheckMetadata:
|
||||
severity = metadata_kwargs["Severity"]
|
||||
if not isinstance(severity, Severity):
|
||||
severity = Severity(severity)
|
||||
|
||||
legacy_metadata_kwargs = {**metadata_kwargs, "Severity": severity}
|
||||
return CheckMetadata.construct(**legacy_metadata_kwargs)
|
||||
|
||||
def _transform_findings_stats(scan_summaries: list[dict]) -> dict:
|
||||
"""
|
||||
Aggregate and transform scan summary data into findings statistics.
|
||||
|
||||
@@ -1126,6 +1126,95 @@ class TestFinding:
|
||||
for segment in expected_segments:
|
||||
assert segment in finding_obj.uid
|
||||
|
||||
@patch(
|
||||
"prowler.lib.outputs.finding.get_check_compliance",
|
||||
new=mock_get_check_compliance,
|
||||
)
|
||||
def test_transform_api_finding_azure_accepts_legacy_persisted_metadata(self):
|
||||
provider = MagicMock()
|
||||
provider.type = "azure"
|
||||
provider.identity.identity_type = "mock_identity_type"
|
||||
provider.identity.identity_id = "mock_identity_id"
|
||||
provider.identity.subscriptions = {"legacy-subscription": "legacy-sub-id"}
|
||||
provider.identity.tenant_ids = ["test-ing-432a-a828-d9c965196f87"]
|
||||
provider.identity.tenant_domain = "mock_tenant_domain"
|
||||
provider.region_config.name = "AzureCloud"
|
||||
|
||||
api_finding = DummyAPIFinding()
|
||||
api_finding.uid = "finding-uid-legacy"
|
||||
api_finding.status = "FAIL"
|
||||
api_finding.status_extended = "Legacy metadata finding"
|
||||
api_finding.check_id = (
|
||||
"entra_conditional_access_policy_require_mfa_for_management_api"
|
||||
)
|
||||
api_finding.check_metadata = {
|
||||
"provider": "azure",
|
||||
"checkid": "entra_conditional_access_policy_require_mfa_for_management_api",
|
||||
"checktitle": "Ensure Multifactor Authentication is Required for Windows Azure Service Management API",
|
||||
"checktype": [],
|
||||
"servicename": "entra",
|
||||
"subservicename": "",
|
||||
"severity": "medium",
|
||||
"resourcetype": "#microsoft.graph.conditionalAccess",
|
||||
"resourcegroup": "IAM",
|
||||
"description": "Legacy description",
|
||||
"risk": "Legacy risk",
|
||||
"relatedurl": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management",
|
||||
"additionalurls": [
|
||||
"https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
|
||||
],
|
||||
"remediation": {
|
||||
"code": {
|
||||
"cli": "",
|
||||
"other": "",
|
||||
"nativeiac": "",
|
||||
"terraform": "",
|
||||
},
|
||||
"recommendation": {
|
||||
"text": "Legacy remediation",
|
||||
"url": "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps",
|
||||
},
|
||||
},
|
||||
"resourceidtemplate": "",
|
||||
"categories": [],
|
||||
"dependson": [],
|
||||
"relatedto": [],
|
||||
"notes": "Legacy notes",
|
||||
}
|
||||
api_finding.muted = False
|
||||
api_resource = DummyResource(
|
||||
uid="/subscriptions/legacy-sub-id/providers/Microsoft.Authorization/policyAssignments/legacy",
|
||||
name="legacy-policy",
|
||||
resource_arn="arn",
|
||||
region="global",
|
||||
tags=[],
|
||||
)
|
||||
api_finding.resources = DummyResources(api_resource)
|
||||
|
||||
finding_obj = Finding.transform_api_finding(api_finding, provider)
|
||||
|
||||
assert finding_obj.account_uid == "legacy-sub-id"
|
||||
assert finding_obj.resource_uid == api_resource.uid
|
||||
assert finding_obj.resource_name == api_resource.name
|
||||
|
||||
meta = finding_obj.metadata
|
||||
assert (
|
||||
meta.CheckTitle
|
||||
== "Ensure Multifactor Authentication is Required for Windows Azure Service Management API"
|
||||
)
|
||||
assert (
|
||||
meta.RelatedUrl
|
||||
== "https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-azure-management"
|
||||
)
|
||||
assert (
|
||||
meta.Remediation.Recommendation.Url
|
||||
== "https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
|
||||
)
|
||||
assert meta.ResourceGroup == "IAM"
|
||||
assert meta.AdditionalURLs == [
|
||||
"https://learn.microsoft.com/en-us/entra/identity/conditional-access/concept-conditional-access-cloud-apps"
|
||||
]
|
||||
|
||||
@patch(
|
||||
"prowler.lib.outputs.finding.get_check_compliance",
|
||||
new=mock_get_check_compliance,
|
||||
|
||||
Reference in New Issue
Block a user