fix(aws): nonetype errors in opensearch, firehose and cognito (#8670)

This commit is contained in:
Daniel Barranquero
2025-09-09 09:27:57 +02:00
committed by GitHub
parent 02b7c5328f
commit 74bf0e6b47
7 changed files with 195 additions and 7 deletions

View File

@@ -32,6 +32,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Renamed `AdditionalUrls` to `AdditionalURLs` field in CheckMetadata [(#8639)](https://github.com/prowler-cloud/prowler/pull/8639)
- TypeError from Python 3.9 in Security Hub module by updating type annotations [(#8619)](https://github.com/prowler-cloud/prowler/pull/8619)
- KeyError when SecurityGroups field is missing in MemoryDB check [(#8666)](https://github.com/prowler-cloud/prowler/pull/8666)
- NoneType error in Opensearch, Firehose and Cognito checks [(#8670)](https://github.com/prowler-cloud/prowler/pull/8670)
---

View File

@@ -14,7 +14,10 @@ class cognito_user_pool_self_registration_disabled(Check):
report.status_extended = (
f"User pool {user_pool.id} has self registration disabled."
)
if not user_pool.admin_create_user_config.allow_admin_create_user_only:
if (
user_pool.admin_create_user_config
and not user_pool.admin_create_user_config.allow_admin_create_user_only
):
report.status = "FAIL"
report.status_extended = (
f"User pool {user_pool.id} has self registration enabled."

View File

@@ -32,9 +32,10 @@ class firehose_stream_encrypted_at_rest(Check):
source_stream = kinesis_client.streams.get(
stream.source.kinesis_stream.kinesis_stream_arn
)
if source_stream.encrypted_at_rest != EncryptionType.NONE:
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled but the source stream {source_stream.name} has at rest encryption enabled."
if source_stream:
if source_stream.encrypted_at_rest != EncryptionType.NONE:
report.status = "PASS"
report.status_extended = f"Firehose Stream {stream.name} does not have at rest encryption enabled but the source stream {source_stream.name} has at rest encryption enabled."
# Check if the stream has encryption enabled directly
elif stream.kms_encryption == EncryptionStatus.ENABLED:

View File

@@ -79,7 +79,7 @@ class OpenSearchService(AWSService):
"AdvancedSecurityOptions"
].get("Enabled", False)
cluster_config = describe_domain["DomainStatus"].get("ClusterConfig", {})
domain.instance_count = cluster_config.get("InstanceCount", None)
domain.instance_count = cluster_config.get("InstanceCount", 0)
domain.zone_awareness_enabled = cluster_config.get(
"ZoneAwarenessEnabled", False
)
@@ -155,10 +155,10 @@ class OpenSearchDomain(BaseModel):
saml_enabled: bool = None
update_available: bool = None
version: str = None
instance_count: Optional[int]
instance_count: int = 0
zone_awareness_enabled: Optional[bool]
tags: Optional[list] = []
advanced_settings_enabled: bool = None
dedicated_master_enabled: Optional[bool]
dedicated_master_count: Optional[int]
dedicated_master_count: int = 0
tags: Optional[list] = []

View File

@@ -245,3 +245,57 @@ class Test_cognito_user_pool_self_registration_disabled:
assert result[0].resource_id == user_pool_id
assert result[0].resource_arn == user_pool_arn
def test_cognito_user_pool_admin_create_user_config_none(self):
"""Test case when admin_create_user_config is None - should not crash"""
cognito_client = mock.MagicMock
user_pool_arn = f"arn:aws:cognito-idp:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:userpool/eu-west-1_123456789"
user_pool_id = "eu-west-1_123456789"
user_pool_name = "eu-west-1_123456789"
cognito_client.user_pools = {
user_pool_arn: UserPool(
admin_create_user_config=None, # This is the key test case
region=AWS_REGION_US_EAST_1,
id=user_pool_id,
arn=user_pool_arn,
name=user_pool_name,
last_modified=datetime.now(),
creation_date=datetime.now(),
status="ACTIVE",
)
}
cognito_identity_client = mock.MagicMock
cognito_identity_client.identity_pools = {}
with (
mock.patch(
"prowler.providers.aws.services.cognito.cognito_service.CognitoIDP",
cognito_client,
),
mock.patch(
"prowler.providers.aws.services.cognito.cognito_idp_client.cognito_idp_client",
cognito_client,
),
mock.patch(
"prowler.providers.aws.services.cognito.cognito_service.CognitoIdentity",
cognito_identity_client,
),
mock.patch(
"prowler.providers.aws.services.cognito.cognito_identity_client.cognito_identity_client",
cognito_identity_client,
),
):
from prowler.providers.aws.services.cognito.cognito_user_pool_self_registration_disabled.cognito_user_pool_self_registration_disabled import (
cognito_user_pool_self_registration_disabled,
)
check = cognito_user_pool_self_registration_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
f"User pool {user_pool_name} has self registration disabled."
)
assert result[0].resource_id == user_pool_id
assert result[0].resource_arn == user_pool_arn

View File

@@ -324,3 +324,58 @@ class Test_firehose_stream_encrypted_at_rest:
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled but the source stream test-kinesis-stream has at rest encryption enabled."
)
@mock_aws
def test_stream_kinesis_source_not_found(self):
"""Test case when Kinesis source stream is not found - should handle None gracefully"""
# Generate Firehose client
firehose_client = client("firehose", region_name=AWS_REGION_EU_WEST_1)
delivery_stream = firehose_client.create_delivery_stream(
DeliveryStreamName="test-delivery-stream",
DeliveryStreamType="KinesisStreamAsSource",
KinesisStreamSourceConfiguration={
"KinesisStreamARN": f"arn:aws:kinesis:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:stream/non-existent-stream",
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
},
S3DestinationConfiguration={
"RoleARN": "arn:aws:iam::012345678901:role/firehose-role",
"BucketARN": "arn:aws:s3:::test-bucket",
"Prefix": "",
"BufferingHints": {"IntervalInSeconds": 300, "SizeInMBs": 5},
"CompressionFormat": "UNCOMPRESSED",
},
Tags=[{"Key": "key", "Value": "value"}],
)
arn = delivery_stream["DeliveryStreamARN"]
stream_name = arn.split("/")[-1]
from prowler.providers.aws.services.firehose.firehose_service import Firehose
from prowler.providers.aws.services.kinesis.kinesis_service import Kinesis
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest.firehose_client",
new=Firehose(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest.kinesis_client",
new=Kinesis(aws_provider),
):
# Test Check
from prowler.providers.aws.services.firehose.firehose_stream_encrypted_at_rest.firehose_stream_encrypted_at_rest import (
firehose_stream_encrypted_at_rest,
)
check = firehose_stream_encrypted_at_rest()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Firehose Stream {stream_name} does not have at rest encryption enabled or the source stream is not encrypted."
)

View File

@@ -173,3 +173,77 @@ class TestOpenSearchServiceService:
assert opensearch.opensearch_domains[domain_arn].tags == [
{"Key": "test", "Value": "test"},
]
# Test OpenSearchService with missing optional fields
@mock_aws
def test_describe_domain_with_missing_fields(self):
"""Test case when some optional fields are missing - should handle gracefully"""
def mock_make_api_call_missing_fields(self, operation_name, kwarg):
if operation_name == "ListDomainNames":
return {
"DomainNames": [
{
"DomainName": test_domain_name,
},
]
}
if operation_name == "DescribeDomain":
return {
"DomainStatus": {
"ARN": domain_arn,
"Endpoints": {
"vpc": "vpc-endpoint-h2dsd34efgyghrtguk5gt6j2foh4.eu-west-1.es.amazonaws.com"
},
"EngineVersion": "opensearch-version1",
"VPCOptions": {
"VPCId": "test-vpc-id",
},
"ClusterConfig": {
"DedicatedMasterEnabled": True,
"DedicatedMasterCount": 1,
"DedicatedMasterType": "m3.medium.search",
"InstanceCount": 1,
"ZoneAwarenessEnabled": True,
},
"CognitoOptions": {"Enabled": True},
"EncryptionAtRestOptions": {"Enabled": True},
"NodeToNodeEncryptionOptions": {"Enabled": True},
"AdvancedOptions": {"string": "string"},
"ServiceSoftwareOptions": {"UpdateAvailable": True},
"DomainEndpointOptions": {"EnforceHTTPS": True},
"AdvancedSecurityOptions": {
"Enabled": True,
"InternalUserDatabaseEnabled": True,
"SAMLOptions": {"Enabled": True},
},
"AccessPolicies": policy_json,
"LogPublishingOptions": {
"SEARCH_SLOW_LOGS": {"Enabled": True},
"INDEX_SLOW_LOGS": {"Enabled": True},
"AUDIT_LOGS": {"Enabled": True},
},
}
}
if operation_name == "ListTags":
return {
"TagList": [
{"Key": "test", "Value": "test"},
]
}
return make_api_call(self, operation_name, kwarg)
with patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_missing_fields,
):
aws_provider = set_mocked_aws_provider([])
opensearch = OpenSearchService(aws_provider)
# Should not crash even with missing optional fields
assert len(opensearch.opensearch_domains) == 1
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
assert (
opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
)
assert opensearch.opensearch_domains[domain_arn].arn == domain_arn