Compare commits

...

2 Commits

Author SHA1 Message Date
Pepe Fagoaga
6a519067fb fix(tests): assert metadata and resource id/name 2025-05-06 09:12:17 +02:00
Pepe Fagoaga
29cc199c9d chore(resource): PoC for the initial version 2025-05-06 08:04:24 +02:00
12 changed files with 275 additions and 71 deletions

View File

@@ -415,6 +415,7 @@ class Check_Report:
resource: dict
resource_details: str
resource_tags: list
resource_service: str
muted: bool
def __init__(self, metadata: Dict, resource: Any) -> None:
@@ -444,6 +445,7 @@ class Check_Report:
self.resource_details = ""
self.resource_tags = getattr(resource, "tags", []) if resource else []
self.muted = False
self.resource_service = getattr(resource, "service", None)
@dataclass

View File

@@ -48,6 +48,7 @@ class Finding(BaseModel):
resource_name: str
resource_details: str
resource_tags: dict = Field(default_factory=dict)
resource_service: str
partition: Optional[str] = None
region: str
compliance: dict
@@ -133,6 +134,7 @@ class Finding(BaseModel):
try:
output_data["provider"] = provider.type
output_data["resource_metadata"] = check_output.resource
output_data["resource_service"] = check_output.resource_service
if provider.type == "aws":
output_data["account_uid"] = get_nested_attribute(

View File

View File

@@ -0,0 +1,12 @@
from pydantic import BaseModel
class Resource(BaseModel):
"""
Represents a generic resource.
Attributes:
service (str): The name of the service associated with the resource.
"""
service: str

View File

@@ -0,0 +1,50 @@
import re
from typing import List, Optional
from pydantic import Field, root_validator, validator
from prowler.lib.resource.resource import Resource
_ARN_PATTERN = re.compile(
r"^arn:(?P<partition>[^:]*):(?P<service>[^:]*):"
r"(?P<region>[^:]*):(?P<account>[^:]*):(?P<resource_id>.+)$"
)
class AWSResource(Resource):
"""
Represents an AWS resource with its associated attributes.
Attributes:
arn (str): The Amazon Resource Name (ARN) uniquely identifying the resource.
id (str): The unique identifier of the resource.
"""
arn: str
id: Optional[str] = None
name: Optional[str] = None
tags: List[str] = Field(default_factory=list)
region: str
@root_validator(pre=True)
def populate_from_arn(cls, values):
arn = values.get("arn")
if arn:
match = _ARN_PATTERN.match(arn)
if not match:
raise ValueError(f"Invalid ARN: {arn!r}")
# Only overwrite if not provided explicitly
values.setdefault("service", match.group("service"))
values.setdefault("region", match.group("region"))
# Extract the last part after the '/' if it exists
resource_id = match.group("resource_id")
values.setdefault(
"id", resource_id.split("/")[-1] if "/" in resource_id else resource_id
)
return values
@validator("region")
# TODO: validate regions
def region_must_be_valid(cls, v):
if not v:
raise ValueError("region cannot be empty")
return v

View File

@@ -32,7 +32,7 @@ class accessanalyzer_enabled(Check):
and not analyzer.region == accessanalyzer_client.region
):
report.muted = True
print(report)
findings.append(report)
return findings

View File

@@ -5,6 +5,7 @@ from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.resource.resource import AWSResource
from prowler.providers.aws.lib.service.service import AWSService
@@ -127,11 +128,8 @@ class Finding(BaseModel):
status: str = ""
class Analyzer(BaseModel):
arn: str
name: str
class Analyzer(AWSResource):
status: str
findings: list[Finding] = []
tags: Optional[list] = []
type: str
region: str

View File

@@ -0,0 +1,8 @@
from prowler.lib.resource.resource import Resource
class ResourceTest:
def test_resource(self):
resource = Resource(service="test_service")
assert resource.service == "test_service"
assert isinstance(resource, Resource)

View File

@@ -0,0 +1,97 @@
import pytest
from pydantic import ValidationError
# Adjust the import path as needed for your project layout
from prowler.lib.resource.resource import Resource
from prowler.providers.aws.lib.resource.resource import _ARN_PATTERN, AWSResource
# Sample ARNs for testing
VALID_S3_ARN = "arn:aws:s3:eu-central-1:123456789012:bucket/my-bucket"
VALID_LAMBDA_ARN = "arn:aws:lambda:us-east-1:123456789012:function:my-function"
INVALID_ARN = "arn:aws::123456:badformat"
class TestAWSResource:
def test_arn_regex_matches(self):
"""Ensure the regex itself captures the expected groups."""
m = _ARN_PATTERN.match(VALID_S3_ARN)
assert m
assert m.group("service") == "s3"
assert m.group("region") == "eu-central-1"
assert m.group("resource_id") == "bucket/my-bucket"
@pytest.mark.parametrize(
"arn,expected_service,expected_region,expected_id",
[
(VALID_S3_ARN, "s3", "eu-central-1", "bucket/my-bucket"),
(VALID_LAMBDA_ARN, "lambda", "us-east-1", "function:my-function"),
],
)
def test_populate_from_arn_defaults(
self, arn, expected_service, expected_region, expected_id
):
"""Fields service, region, and id should be autofilled from the ARN."""
resource = AWSResource(arn=arn)
assert isinstance(resource, Resource)
assert resource.service == expected_service
assert resource.region == expected_region
assert resource.id == expected_id
def test_override_service_and_region_and_id(self):
"""Explicitly provided values should override ARNderived defaults."""
resource = AWSResource(
arn=VALID_S3_ARN,
service="override-service",
region="override-region",
id="override-id",
)
assert resource.service == "override-service"
assert resource.region == "override-region"
assert resource.id == "override-id"
def test_missing_arn_raises_error(self):
"""If no ARN is given, an error is raised."""
with pytest.raises(ValidationError) as excinfo:
AWSResource(
service="manual-service", region="manual-region", id="manual-id"
)
assert (
"1 validation error for AWSResource\narn\n field required (type=value_error.missing)"
in str(excinfo.value)
)
def test_empty_region_raises_error(self):
"""An explicitly empty region (or an ARN with empty region) should error."""
# Empty string region
with pytest.raises(ValidationError) as excinfo:
AWSResource(arn=None, service="s3", region="")
assert "region cannot be empty" in str(excinfo.value)
# ARN with missing region segment
bad_arn = "arn:aws:s3::123456789012:bucket/x"
with pytest.raises(ValidationError) as excinfo2:
AWSResource(arn=bad_arn)
# root_validator will run first but region validator will catch empty region
assert "region cannot be empty" in str(excinfo2.value)
def test_invalid_arn_raises_value_error(self):
"""Malformed ARN should trigger ValueError from the root_validator."""
with pytest.raises(ValidationError) as excinfo:
AWSResource(arn=INVALID_ARN)
# unwrap to see the inner ValueError message
err = excinfo.value.errors()[0]
assert "Invalid ARN" in err["msg"]
def test_tags_default_and_mutability(self):
"""Ensure tags default to an empty list and are independent per instance."""
resource_1 = AWSResource(arn=VALID_S3_ARN)
resource_2 = AWSResource(arn=VALID_S3_ARN)
assert resource_1.tags == [] and resource_2.tags == []
resource_1.tags.append("tag1")
assert resource_2.tags == []
def test_name_field_passthrough(self):
"""Name should be accepted and stored unchanged."""
res = AWSResource(arn=VALID_S3_ARN, name="my-name")
assert res.name == "my-name"

View File

@@ -3,13 +3,14 @@ from unittest import mock
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_service import (
Analyzer,
)
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2
AWS_REGION_1 = "eu-west-1"
AWS_REGION_2 = "eu-west-2"
AWS_ACCOUNT_NUMBER = "123456789012"
AWS_ACCOUNT_ARN = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
ACCESS_ANALYZER_NAME = "test-analyzer"
ACCESS_ANALYZER_ARN = f"arn:aws:access-analyzer:{AWS_REGION_2}:{AWS_ACCOUNT_NUMBER}:analyzer/{ACCESS_ANALYZER_NAME}"
ACCESS_ANALYZER_ARN = f"arn:aws:access-analyzer:{AWS_REGION_EU_WEST_2}:{AWS_ACCOUNT_NUMBER}:analyzer/{ACCESS_ANALYZER_NAME}"
UNKNOWN_ACCESS_ANALYZER_NAME = "unknown"
UNKNOWN_ACCESS_ANALYZER_ARN = f"arn:aws:access-analyzer:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:analyzer/{UNKNOWN_ACCESS_ANALYZER_NAME}"
class Test_accessanalyzer_enabled:
@@ -33,31 +34,22 @@ class Test_accessanalyzer_enabled:
def test_one_analyzer_not_available(self):
# Include analyzers to check
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.region = AWS_REGION_1
accessanalyzer_client.region = AWS_REGION_EU_WEST_1
accessanalyzer_client.audited_partition = "aws"
accessanalyzer_client.audited_account = AWS_ACCOUNT_NUMBER
accessanalyzer_client.get_unknown_arn = (
lambda x: f"arn:aws:accessanalyzer:{x}:{AWS_ACCOUNT_NUMBER}:unknown"
)
accessanalyzer_client.analyzers = [
Analyzer(
arn=AWS_ACCOUNT_ARN,
name=AWS_ACCOUNT_NUMBER,
arn=UNKNOWN_ACCESS_ANALYZER_ARN,
name=UNKNOWN_ACCESS_ANALYZER_NAME,
status="NOT_AVAILABLE",
tags=[],
type="",
region=AWS_REGION_1,
)
]
with (
mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
accessanalyzer_client,
),
mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer.get_unknown_arn",
return_value="arn:aws:accessanalyzer:eu-west-1:123456789012:unknown",
),
with mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
accessanalyzer_client,
):
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_enabled.accessanalyzer_enabled import (
accessanalyzer_enabled,
@@ -72,29 +64,39 @@ class Test_accessanalyzer_enabled:
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == "123456789012"
assert result[0].resource_arn == "arn:aws:iam::123456789012:root"
assert result[0].region == AWS_REGION_1
# Review this values too
assert result[0].resource_id == UNKNOWN_ACCESS_ANALYZER_NAME
assert result[0].resource_arn == UNKNOWN_ACCESS_ANALYZER_ARN
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
assert result[0].resource_service == "access-analyzer"
assert result[0].resource == {
"service": "access-analyzer",
"arn": UNKNOWN_ACCESS_ANALYZER_ARN,
"id": UNKNOWN_ACCESS_ANALYZER_NAME,
"name": UNKNOWN_ACCESS_ANALYZER_NAME,
"tags": [],
"region": "eu-west-1",
"status": "NOT_AVAILABLE",
"findings": [],
"type": "",
}
def test_one_analyzer_not_available_muted(self):
# Include analyzers to check
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.region = AWS_REGION_2
accessanalyzer_client.region = AWS_REGION_EU_WEST_2
accessanalyzer_client.audit_config = {"mute_non_default_regions": True}
accessanalyzer_client.audited_partition = "aws"
accessanalyzer_client.audited_account = AWS_ACCOUNT_NUMBER
accessanalyzer_client.get_unknown_arn = (
lambda x: f"arn:aws:accessanalyzer:{x}:{AWS_ACCOUNT_NUMBER}:unknown"
)
accessanalyzer_client.analyzers = [
Analyzer(
arn=AWS_ACCOUNT_ARN,
name=AWS_ACCOUNT_NUMBER,
arn=UNKNOWN_ACCESS_ANALYZER_ARN,
name=UNKNOWN_ACCESS_ANALYZER_NAME,
status="NOT_AVAILABLE",
tags=[],
type="",
region=AWS_REGION_1,
)
]
with (
@@ -102,10 +104,6 @@ class Test_accessanalyzer_enabled:
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
accessanalyzer_client,
),
mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer.get_unknown_arn",
return_value="arn:aws:accessanalyzer:eu-west-1:123456789012:unknown",
),
):
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_enabled.accessanalyzer_enabled import (
accessanalyzer_enabled,
@@ -121,27 +119,36 @@ class Test_accessanalyzer_enabled:
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == "123456789012"
assert result[0].resource_arn == "arn:aws:iam::123456789012:root"
assert result[0].region == AWS_REGION_1
assert result[0].resource_id == UNKNOWN_ACCESS_ANALYZER_NAME
assert result[0].resource_arn == UNKNOWN_ACCESS_ANALYZER_ARN
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
assert result[0].resource_service == "access-analyzer"
assert result[0].resource == {
"service": "access-analyzer",
"arn": UNKNOWN_ACCESS_ANALYZER_ARN,
"id": UNKNOWN_ACCESS_ANALYZER_NAME,
"name": UNKNOWN_ACCESS_ANALYZER_NAME,
"tags": [],
"region": "eu-west-1",
"status": "NOT_AVAILABLE",
"findings": [],
"type": "",
}
def test_two_analyzers(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.region = AWS_REGION_1
accessanalyzer_client.region = AWS_REGION_EU_WEST_1
accessanalyzer_client.audited_partition = "aws"
accessanalyzer_client.audited_account = AWS_ACCOUNT_NUMBER
accessanalyzer_client.get_unknown_arn = (
lambda x: f"arn:aws:accessanalyzer:{x}:{AWS_ACCOUNT_NUMBER}:analyzer/unknown"
)
accessanalyzer_client.analyzers = [
Analyzer(
arn=f"arn:aws:accessanalyzer:{AWS_REGION_1}:{AWS_ACCOUNT_NUMBER}:analyzer/unknown",
name="analyzer/unknown",
arn=UNKNOWN_ACCESS_ANALYZER_ARN,
name=UNKNOWN_ACCESS_ANALYZER_NAME,
status="NOT_AVAILABLE",
tags=[],
type="",
region=AWS_REGION_1,
),
Analyzer(
arn=ACCESS_ANALYZER_ARN,
@@ -149,20 +156,14 @@ class Test_accessanalyzer_enabled:
status="ACTIVE",
tags=[],
type="",
region=AWS_REGION_2,
region=AWS_REGION_EU_WEST_2,
),
]
# Patch AccessAnalyzer Client
with (
mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
new=accessanalyzer_client,
),
mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer.get_unknown_arn",
return_value="arn:aws:accessanalyzer:eu-west-1:123456789012:analyzer/unknown",
),
with mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
new=accessanalyzer_client,
):
# Test Check
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_enabled.accessanalyzer_enabled import (
@@ -179,13 +180,21 @@ class Test_accessanalyzer_enabled:
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == "analyzer/unknown"
assert (
result[0].resource_arn
== "arn:aws:accessanalyzer:eu-west-1:123456789012:analyzer/unknown"
)
assert result[0].resource_id == UNKNOWN_ACCESS_ANALYZER_NAME
assert result[0].resource_arn == UNKNOWN_ACCESS_ANALYZER_ARN
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_1
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource == {
"service": "access-analyzer",
"arn": UNKNOWN_ACCESS_ANALYZER_ARN,
"id": UNKNOWN_ACCESS_ANALYZER_NAME,
"name": UNKNOWN_ACCESS_ANALYZER_NAME,
"tags": [],
"region": AWS_REGION_EU_WEST_1,
"status": "NOT_AVAILABLE",
"findings": [],
"type": "",
}
assert result[1].status == "PASS"
assert (
@@ -195,7 +204,19 @@ class Test_accessanalyzer_enabled:
assert result[1].resource_id == ACCESS_ANALYZER_NAME
assert result[1].resource_arn == ACCESS_ANALYZER_ARN
assert result[1].resource_tags == []
assert result[1].region == AWS_REGION_2
assert result[1].region == AWS_REGION_EU_WEST_2
assert result[1].resource_service == "access-analyzer"
assert result[1].resource == {
"service": "access-analyzer",
"arn": ACCESS_ANALYZER_ARN,
"id": ACCESS_ANALYZER_NAME,
"name": ACCESS_ANALYZER_NAME,
"tags": [],
"region": AWS_REGION_EU_WEST_2,
"status": "ACTIVE",
"findings": [],
"type": "",
}
def test_one_active_analyzer(self):
accessanalyzer_client = mock.MagicMock
@@ -206,7 +227,6 @@ class Test_accessanalyzer_enabled:
status="ACTIVE",
tags=[],
type="",
region=AWS_REGION_2,
)
]
@@ -232,4 +252,16 @@ class Test_accessanalyzer_enabled:
assert result[0].resource_id == ACCESS_ANALYZER_NAME
assert result[0].resource_arn == ACCESS_ANALYZER_ARN
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_2
assert result[0].region == AWS_REGION_EU_WEST_2
assert result[0].resource_service == "access-analyzer"
assert result[0].resource == {
"service": "access-analyzer",
"arn": ACCESS_ANALYZER_ARN,
"id": ACCESS_ANALYZER_NAME,
"name": ACCESS_ANALYZER_NAME,
"tags": [],
"region": AWS_REGION_EU_WEST_2,
"status": "ACTIVE",
"findings": [],
"type": "",
}

View File

@@ -14,6 +14,8 @@ from tests.providers.aws.utils import (
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
ANALYZER_ARN = "arn:aws:access-analyzer:eu-west-1:111122223333:analyzer/TestAnalyzer"
def mock_make_api_call(self, operation_name, kwarg):
"""
@@ -27,8 +29,8 @@ def mock_make_api_call(self, operation_name, kwarg):
return {
"analyzers": [
{
"arn": "ARN",
"name": "Test Analyzer",
"arn": ANALYZER_ARN,
"name": "TestAnalyzer",
"status": "ACTIVE",
"findings": 0,
"tags": {"test": "test"},
@@ -98,8 +100,9 @@ class Test_AccessAnalyzer_Service:
set_mocked_aws_provider([AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1])
)
assert len(access_analyzer.analyzers) == 1
assert access_analyzer.analyzers[0].arn == "ARN"
assert access_analyzer.analyzers[0].name == "Test Analyzer"
assert access_analyzer.analyzers[0].arn == ANALYZER_ARN
assert access_analyzer.analyzers[0].name == "TestAnalyzer"
assert access_analyzer.analyzers[0].service == "access-analyzer"
assert access_analyzer.analyzers[0].status == "ACTIVE"
assert access_analyzer.analyzers[0].tags == [{"test": "test"}]
assert access_analyzer.analyzers[0].type == "ACCOUNT"