Compare commits

...

10 Commits

Author SHA1 Message Date
Pepe Fagoaga
dfcaeb3bd4 Merge branch 'master' into PRWLR-6129-review-check-report-init-for-region-resource-id-resource-arn 2025-02-05 14:08:25 +05:45
Pepe Fagoaga
1d06342856 fix: tests check report 2025-01-30 14:17:47 +01:00
Pepe Fagoaga
eee552dba4 chore: add TODO 2025-01-30 12:54:52 +01:00
Pepe Fagoaga
88455353ad fix(kubernetes): Handle check_report values and add tests 2025-01-30 12:53:47 +01:00
Pepe Fagoaga
d7e24fce30 fix(gcp): Handle values in check_report and add tests 2025-01-30 12:49:19 +01:00
Pepe Fagoaga
4460bc2e82 test(azure): subscription is None by default 2025-01-30 11:45:53 +01:00
Pepe Fagoaga
de05d910e1 chore: rename Check_Report to CheckReport 2025-01-30 11:44:50 +01:00
Pepe Fagoaga
8478da5b8f Merge branch 'master' of github.com:prowler-cloud/prowler into PRWLR-6129-review-check-report-init-for-region-resource-id-resource-arn 2025-01-30 11:24:33 +01:00
Pepe Fagoaga
210020021d fix(check_report): WIP 2025-01-30 08:07:04 +01:00
Pepe Fagoaga
bf4a8c6815 fix(check_report): Do not allow empty values 2025-01-29 16:57:01 +01:00
5 changed files with 369 additions and 51 deletions

View File

@@ -399,7 +399,7 @@ class Check(ABC, CheckMetadata):
@dataclass
class Check_Report:
class CheckReport:
"""Contains the Check's finding information."""
status: str
@@ -440,8 +440,17 @@ class Check_Report:
@dataclass
class Check_Report_AWS(Check_Report):
"""Contains the AWS Check's finding information."""
class Check_Report_AWS(CheckReport):
"""
Contains the AWS Check's finding information.
Attributes:
resource_id (str): The resource ID.
resource_arn (str): The resource ARN.
region (str): The region of the resource.
Raises:
AttributeError: If the any of the required attributes are not present.
"""
resource_id: str
resource_arn: str
@@ -449,21 +458,25 @@ class Check_Report_AWS(Check_Report):
def __init__(self, metadata: Dict, resource: Any) -> None:
super().__init__(metadata, resource)
self.resource_id = (
getattr(resource, "id", None) or getattr(resource, "name", None) or ""
)
self.resource_arn = getattr(resource, "arn", "")
self.region = getattr(resource, "region", "")
# Use resource.name as fallback if no resource.id
# If no name or id, an exception is raised
try:
self.resource_id = getattr(resource, "id")
except AttributeError:
self.resource_id = getattr(resource, "name")
# ARN and Region must be present
self.resource_arn = getattr(resource, "arn")
self.region = getattr(resource, "region")
@dataclass
class Check_Report_Azure(Check_Report):
class Check_Report_Azure(CheckReport):
"""Contains the Azure Check's finding information."""
resource_name: str
resource_id: str
subscription: str
location: str
subscription: str = None
def __init__(self, metadata: Dict, resource: Any) -> None:
"""Initialize the Azure Check's finding information.
@@ -473,16 +486,21 @@ class Check_Report_Azure(Check_Report):
resource: Basic information about the resource. Defaults to None.
"""
super().__init__(metadata, resource)
self.resource_name = getattr(
resource, "name", getattr(resource, "resource_name", "")
)
self.resource_id = getattr(resource, "id", getattr(resource, "resource_id", ""))
self.subscription = ""
try:
self.resource_id = getattr(resource, "id")
except AttributeError:
self.resource_id = getattr(resource, "resource_id")
try:
self.resource_name = getattr(resource, "name")
except AttributeError:
self.resource_name = getattr(resource, "resource_name")
self.location = getattr(resource, "location", "global")
@dataclass
class Check_Report_GCP(Check_Report):
class Check_Report_GCP(CheckReport):
"""Contains the GCP Check's finding information."""
resource_name: str
@@ -500,23 +518,30 @@ class Check_Report_GCP(Check_Report):
project_id=None,
) -> None:
super().__init__(metadata, resource)
self.resource_id = (
resource_id
or getattr(resource, "id", None)
or getattr(resource, "name", None)
or ""
)
self.resource_name = resource_name or getattr(resource, "name", "")
self.project_id = project_id or getattr(resource, "project_id", "")
self.location = (
location
or getattr(resource, "location", "")
or getattr(resource, "region", "")
)
self.resource_name = resource_name or getattr(resource, "name")
if resource_id:
self.resource_id = resource_id
else:
try:
self.resource_id = getattr(resource, "id")
except AttributeError:
self.resource_id = self.resource_name
self.project_id = project_id or getattr(resource, "project_id")
if location:
self.location = location
else:
try:
self.location = getattr(resource, "location")
except AttributeError:
self.location = getattr(resource, "region")
@dataclass
class Check_Report_Kubernetes(Check_Report):
class Check_Report_Kubernetes(CheckReport):
# TODO change class name to CheckReportKubernetes
"""Contains the Kubernetes Check's finding information."""
@@ -526,17 +551,18 @@ class Check_Report_Kubernetes(Check_Report):
def __init__(self, metadata: Dict, resource: Any) -> None:
super().__init__(metadata, resource)
self.resource_id = (
getattr(resource, "uid", None) or getattr(resource, "name", None) or ""
)
self.resource_name = getattr(resource, "name", "")
self.resource_name = getattr(resource, "name")
try:
self.resource_id = getattr(resource, "uid")
except AttributeError:
self.resource_id = self.resource_name
self.namespace = getattr(resource, "namespace", "cluster-wide")
if not self.namespace:
self.namespace = "cluster-wide"
# TODO: review this logic and add tests
@dataclass
class Check_Report_Microsoft365(Check_Report):
class Check_Report_Microsoft365(CheckReport):
"""Contains the Microsoft365 Check's finding information."""
resource_name: str
@@ -553,7 +579,7 @@ class Check_Report_Microsoft365(Check_Report):
"""
super().__init__(metadata, resource)
self.resource_name = getattr(
resource, "name", getattr(resource, "resource_name", "")
resource, "name", getattr(resource, "resource_name")
)
self.resource_id = getattr(resource, "id", getattr(resource, "resource_id", ""))
self.tenant_id = getattr(resource, "tenant_id", "")

View File

@@ -1,6 +1,6 @@
import sys
from prowler.lib.check.models import Check_Report
from prowler.lib.check.models import CheckReport
from prowler.lib.logger import logger
from prowler.lib.outputs.compliance.cis.cis import get_cis_table
from prowler.lib.outputs.compliance.ens.ens import get_ens_table
@@ -90,7 +90,7 @@ def display_compliance_table(
# TODO: this should be in the Check class
def get_check_compliance(
finding: Check_Report, provider_type: str, bulk_checks_metadata: dict
finding: CheckReport, provider_type: str, bulk_checks_metadata: dict
) -> dict:
"""get_check_compliance returns a map with the compliance framework as key and the requirements where the finding's check is present.
@@ -102,7 +102,7 @@ def get_check_compliance(
}
Args:
finding (Any): The Check_Report finding
finding (Any): The CheckReport finding
provider_type (str): The provider type
bulk_checks_metadata (dict): The bulk checks metadata

View File

@@ -4,7 +4,7 @@ from typing import Optional, Union
from pydantic import BaseModel, Field, ValidationError
from prowler.config.config import prowler_version
from prowler.lib.check.models import Check_Report, CheckMetadata
from prowler.lib.check.models import CheckMetadata, CheckReport
from prowler.lib.logger import logger
from prowler.lib.outputs.common import Status, fill_common_finding_data
from prowler.lib.outputs.compliance.compliance import get_check_compliance
@@ -91,13 +91,13 @@ class Finding(BaseModel):
@classmethod
def generate_output(
cls, provider: Provider, check_output: Check_Report, output_options
cls, provider: Provider, check_output: CheckReport, output_options
) -> "Finding":
"""Generates the output for a finding based on the provider and output options
Args:
provider (Provider): the provider object
check_output (Check_Report): the check output object
check_output (CheckReport): the check output object
output_options: the output options object, depending on the provider
Returns:
finding_output (Finding): the finding output object

View File

@@ -1,6 +1,15 @@
from unittest import mock
from prowler.lib.check.models import CheckMetadata
import pytest
from prowler.lib.check.models import (
Check_Report_AWS,
Check_Report_Azure,
Check_Report_GCP,
Check_Report_Kubernetes,
CheckMetadata,
CheckReport,
)
from tests.lib.check.compliance_check_test import custom_compliance_metadata
mock_metadata = CheckMetadata(
@@ -63,7 +72,6 @@ mock_metadata_lambda = CheckMetadata(
class TestCheckMetada:
@mock.patch("prowler.lib.check.models.load_check_metadata")
@mock.patch("prowler.lib.check.models.recover_checks_from_provider")
def test_get_bulk(self, mock_recover_checks, mock_load_metadata):
@@ -324,3 +332,287 @@ class TestCheckMetada:
result = CheckMetadata.list(bulk_checks_metadata=bulk_metadata)
assert result == set()
class TestCheckReport:
def test_check_report_resource_dict(self):
resource = {"id": "test_id"}
check_report = CheckReport(metadata=mock_metadata.json(), resource=resource)
assert check_report.status == ""
assert check_report.check_metadata == mock_metadata
assert check_report.resource == resource
assert check_report.status_extended == ""
assert check_report.resource_details == ""
assert check_report.resource_tags == []
assert check_report.muted is False
# def test_check_report_resource_dict_method(self):
# resource = mock.Mock()
# resource.dict = lambda: {"id": "test_id"}
# check_report = CheckReport(metadata=mock_metadata.json(), resource=resource)
# assert check_report.status == ""
# assert check_report.check_metadata == mock_metadata
# assert check_report.resource == {"id": "test_id"}
# assert check_report.status_extended == ""
# assert check_report.resource_details == ""
# assert check_report.resource_tags == []
# assert check_report.muted is False
class TestCheckReportAWS:
def test_check_report_aws(self):
resource = mock.Mock(spec=["id", "arn", "region"])
resource.id = "test_id"
resource.arn = "test_arn"
resource.region = "test_region"
check_report_aws = Check_Report_AWS(
metadata=mock_metadata.json(), resource=resource
)
assert check_report_aws.resource_id == "test_id"
assert check_report_aws.resource_arn == "test_arn"
assert check_report_aws.region == "test_region"
def test_check_report_aws_no_id_but_name(self):
resource = mock.Mock(spec=["name", "arn", "region"])
resource.name = "test_id"
resource.arn = "test_arn"
resource.region = "test_region"
report = Check_Report_AWS(metadata=mock_metadata.json(), resource=resource)
assert report.resource_id == "test_id"
assert report.resource_arn == "test_arn"
assert report.region == "test_region"
def test_check_report_aws_no_id_or_name(self):
resource = mock.Mock(spec=["arn", "region"])
resource.arn = "test_arn"
resource.region = "test_region"
with pytest.raises(AttributeError):
Check_Report_AWS(metadata=mock_metadata.json(), resource=resource)
def test_check_report_aws_no_arn(self):
resource = mock.Mock(spec=["id", "region"])
resource.id = "test_id"
resource.region = "test_region"
with pytest.raises(AttributeError):
Check_Report_AWS(metadata=mock_metadata.json(), resource=resource)
def test_check_report_aws_no_region(self):
resource = mock.Mock(spec=["id", "arn"])
resource.id = "test_id"
resource.arn = "test_arn"
with pytest.raises(AttributeError):
Check_Report_AWS(metadata=mock_metadata.json(), resource=resource)
# check finding without resource_id
# raise log error
# continue execution
class TestCheckReportAzure:
def test_check_report_azure(self):
resource = mock.Mock(spec=["id", "name", "location"])
resource.id = "test_id"
resource.name = "test_name"
resource.location = "test_location"
report = Check_Report_Azure(metadata=mock_metadata.json(), resource=resource)
assert report.resource_id == "test_id"
assert report.resource_name == "test_name"
assert report.location == "test_location"
assert report.subscription is None
def test_check_report_azure_no_id(self):
resource = mock.Mock(spec=["name", "location"])
resource.name = "test_name"
resource.location = "global"
with pytest.raises(AttributeError):
Check_Report_Azure(metadata=mock_metadata.json(), resource=resource)
def test_check_report_azure_resource_id(self):
resource = mock.Mock(spec=["resource_id", "name", "location"])
resource.resource_id = "resource_id"
resource.name = "test_name"
resource.location = "global"
report = Check_Report_Azure(metadata=mock_metadata.json(), resource=resource)
assert report.resource_id == "resource_id"
assert report.resource_name == "test_name"
assert report.location == "global"
assert report.subscription is None
def test_check_report_azure_no_name(self):
resource = mock.Mock(spec=["id", "location"])
resource.id = "test_id"
resource.location = "global"
with pytest.raises(AttributeError):
Check_Report_Azure(metadata=mock_metadata.json(), resource=resource)
def test_check_report_azure_resource_name(self):
resource = mock.Mock(spec=["id", "resource_name", "location"])
resource.id = "test_id"
resource.resource_name = "test_name"
resource.location = "global"
report = Check_Report_Azure(metadata=mock_metadata.json(), resource=resource)
assert report.resource_id == "test_id"
assert report.resource_name == "test_name"
assert report.location == "global"
assert report.subscription is None
def test_check_report_azure_no_location(self):
resource = mock.Mock(spec=["id", "name"])
resource.id = "test_id"
resource.name = "test_name"
report = Check_Report_Azure(metadata=mock_metadata.json(), resource=resource)
assert report.resource_id == "test_id"
assert report.resource_name == "test_name"
assert report.location == "global"
assert report.subscription is None
class TestCheckReportGCP:
def test_check_report_gcp(self):
resource = mock.Mock(spec=["id", "name", "project_id", "location"])
resource.id = "test_id"
resource.name = "test_name"
resource.project_id = "test_project"
resource.location = "test_location"
report = Check_Report_GCP(metadata=mock_metadata.json(), resource=resource)
assert report.resource_id == "test_id"
assert report.resource_name == "test_name"
assert report.location == "test_location"
assert report.project_id == "test_project"
def test_check_report_gcp_resource_id(self):
resource = mock.Mock(spec=["id", "name", "project_id", "location"])
resource.id = "test_id"
resource.name = "test_name"
resource.project_id = "test_project"
resource.location = "test_location"
report = Check_Report_GCP(
metadata=mock_metadata.json(), resource=resource, resource_id="resource_1"
)
assert report.resource_id == "resource_1"
assert report.resource_name == "test_name"
assert report.location == "test_location"
assert report.project_id == "test_project"
def test_check_report_gcp_no_resource_id(self):
resource = mock.Mock(spec=["name", "project_id", "location"])
resource.name = "test_name"
resource.project_id = "test_project"
resource.location = "test_location"
report = Check_Report_GCP(
metadata=mock_metadata.json(),
resource=resource,
)
assert report.resource_id == "test_name"
assert report.resource_name == "test_name"
assert report.location == "test_location"
assert report.project_id == "test_project"
def test_check_report_gcp_resource_name(self):
resource = mock.Mock(spec=["id", "name", "project_id", "location"])
resource.id = "test_id"
resource.name = "test_name"
resource.project_id = "test_project"
resource.location = "test_location"
report = Check_Report_GCP(
metadata=mock_metadata.json(), resource=resource, resource_name="resource_1"
)
assert report.resource_id == "test_id"
assert report.resource_name == "resource_1"
assert report.location == "test_location"
assert report.project_id == "test_project"
def test_check_report_gcp_no_project_id(self):
resource = mock.Mock(spec=["id", "name", "location"])
resource.id = "test_id"
resource.name = "test_name"
resource.location = "test_location"
with pytest.raises(AttributeError):
Check_Report_GCP(metadata=mock_metadata.json(), resource=resource)
def test_check_report_gcp_no_location(self):
resource = mock.Mock(spec=["id", "name", "project_id"])
resource.id = "test_id"
resource.name = "test_name"
resource.project_id = "test_project"
with pytest.raises(AttributeError):
Check_Report_GCP(metadata=mock_metadata.json(), resource=resource)
def test_check_report_gcp_region(self):
resource = mock.Mock(spec=["id", "name", "region", "project_id"])
resource.id = "test_id"
resource.name = "test_name"
resource.region = "test_region"
resource.project_id = "test_project"
report = Check_Report_GCP(
metadata=mock_metadata.json(),
resource=resource,
)
assert report.resource_id == "test_id"
assert report.resource_name == "test_name"
assert report.location == "test_region"
assert report.project_id == "test_project"
class TestCheckReportKubernetes:
def test_check_report_kubernetes(self):
resource = mock.Mock(spec=["uid", "name", "namespace"])
resource.uid = "test_uid"
resource.name = "test_name"
resource.namespace = "test_namespace"
report = Check_Report_Kubernetes(
metadata=mock_metadata.json(), resource=resource
)
assert report.resource_id == "test_uid"
assert report.resource_name == "test_name"
assert report.namespace == "test_namespace"
def test_check_report_kubernetes_no_name(self):
resource = mock.Mock(spec=["uid", "namespace"])
resource.uid = "test_uid"
resource.namespace = "test_namespace"
with pytest.raises(AttributeError):
Check_Report_Kubernetes(metadata=mock_metadata.json(), resource=resource)
def test_check_report_kubernetes_no_uid(self):
resource = mock.Mock(spec=["name", "namespace"])
resource.name = "test_name"
resource.namespace = "test_namespace"
report = Check_Report_Kubernetes(
metadata=mock_metadata.json(), resource=resource
)
assert report.resource_id == "test_name"
assert report.resource_name == "test_name"
assert report.namespace == "test_namespace"
def test_check_report_kubernetes_no_namespace(self):
resource = mock.Mock(spec=["name"])
resource.name = "test_name"
report = Check_Report_Kubernetes(
metadata=mock_metadata.json(), resource=resource
)
assert report.resource_id == "test_name"
assert report.resource_name == "test_name"
assert report.namespace == "cluster-wide"

View File

@@ -6,7 +6,7 @@ from prowler.lib.check.compliance_models import (
Compliance,
Compliance_Requirement,
)
from prowler.lib.check.models import Check_Report, load_check_metadata
from prowler.lib.check.models import CheckReport, load_check_metadata
from prowler.lib.outputs.compliance.compliance import get_check_compliance
@@ -69,7 +69,7 @@ class TestCompliance:
),
]
finding = Check_Report(
finding = CheckReport(
metadata=load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/../fixtures/metadata.json"
).json(),
@@ -149,7 +149,7 @@ class TestCompliance:
),
]
finding = Check_Report(
finding = CheckReport(
metadata=load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/../fixtures/metadata.json"
).json(),
@@ -229,7 +229,7 @@ class TestCompliance:
),
]
finding = Check_Report(
finding = CheckReport(
metadata=load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/../fixtures/metadata.json"
).json(),
@@ -309,7 +309,7 @@ class TestCompliance:
),
]
finding = Check_Report(
finding = CheckReport(
metadata=load_check_metadata(
f"{path.dirname(path.realpath(__file__))}/../fixtures/metadata.json"
).json(),