feat(aws): Add new checks ses_identities/glue_data_catalogs/secretsmanager _not_publicly_accessible (#5471)

This commit is contained in:
Mario Rodriguez Lopez
2024-10-18 22:40:12 +02:00
committed by GitHub
parent 2b34fd39f6
commit 50cb79ee2f
26 changed files with 1215 additions and 289 deletions

View File

@@ -9767,6 +9767,39 @@
]
}
},
"sesv2": {
"regions": {
"aws": [
"af-south-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ca-central-1",
"eu-central-1",
"eu-north-1",
"eu-south-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"il-central-1",
"me-south-1",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-1",
"us-west-2"
],
"aws-cn": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
]
}
},
"shield": {
"regions": {
"aws": [
@@ -11421,4 +11454,4 @@
}
}
}
}
}

View File

@@ -5,21 +5,25 @@ from prowler.providers.aws.services.glue.glue_client import glue_client
class glue_data_catalogs_connection_passwords_encryption_enabled(Check):
def execute(self):
findings = []
for encryption in glue_client.catalog_encryption_settings:
for data_catalog in glue_client.data_catalogs.values():
# Check only if there are Glue Tables
if encryption.tables or glue_client.provider.scan_unused_services:
if data_catalog.tables or glue_client.provider.scan_unused_services:
report = Check_Report_AWS(self.metadata())
report.resource_id = glue_client.audited_account
report.resource_arn = glue_client._get_data_catalog_arn_template(
encryption.region
data_catalog.region
)
report.region = encryption.region
report.region = data_catalog.region
report.status = "FAIL"
report.status = "FAIL"
report.status_extended = (
"Glue data catalog connection password is not encrypted."
)
if encryption.password_encryption:
if (
data_catalog.encryption_settings
and data_catalog.encryption_settings.password_encryption
):
report.status = "PASS"
report.status_extended = f"Glue data catalog connection password is encrypted with KMS key {encryption.password_kms_id}."
report.status_extended = f"Glue data catalog connection password is encrypted with KMS key {data_catalog.encryption_settings.password_kms_id}."
findings.append(report)
return findings

View File

@@ -5,21 +5,24 @@ from prowler.providers.aws.services.glue.glue_client import glue_client
class glue_data_catalogs_metadata_encryption_enabled(Check):
def execute(self):
findings = []
for encryption in glue_client.catalog_encryption_settings:
for data_catalog in glue_client.data_catalogs.values():
# Check only if there are Glue Tables
if encryption.tables or glue_client.provider.scan_unused_services:
if data_catalog.tables or glue_client.provider.scan_unused_services:
report = Check_Report_AWS(self.metadata())
report.resource_id = glue_client.audited_account
report.resource_arn = glue_client._get_data_catalog_arn_template(
encryption.region
data_catalog.region
)
report.region = encryption.region
report.region = data_catalog.region
report.status = "FAIL"
report.status_extended = (
"Glue data catalog settings have metadata encryption disabled."
)
if encryption.mode == "SSE-KMS":
if (
data_catalog.encryption_settings
and data_catalog.encryption_settings.mode == "SSE-KMS"
):
report.status = "PASS"
report.status_extended = f"Glue data catalog settings have metadata encryption enabled with KMS key {encryption.kms_id}."
report.status_extended = f"Glue data catalog settings have metadata encryption enabled with KMS key {data_catalog.encryption_settings.kms_id}."
findings.append(report)
return findings

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "glue_data_catalogs_not_publicly_accessible",
"CheckTitle": "Ensure Glue Data Catalogs are not publicly accessible.",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "glue",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:glue:region:account-id:catalog",
"Severity": "high",
"ResourceType": "AwsGlueDataCatalog",
"Description": "This control checks whether Glue Data Catalogs are not publicly accessible via resource policies.",
"Risk": "Publicly accessible Glue Data Catalogs can expose sensitive data schema and metadata, leading to potential security risks.",
"RelatedUrl": "https://docs.aws.amazon.com/glue/latest/dg/security_iam_service-with-iam.html?icmpid=docs_console_unmapped#security_iam_service-with-iam-resource-based-policies",
"Remediation": {
"Code": {
"CLI": "aws glue delete-resource-policy",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Review Glue Data Catalog policies and ensure they are not publicly accessible. Implement the Principle of Least Privilege.",
"Url": "https://docs.aws.amazon.com/glue/latest/dg/security_iam_service-with-iam.html?icmpid=docs_console_unmapped#security_iam_service-with-iam-resource-based-policies"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,27 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.glue.glue_client import glue_client
from prowler.providers.aws.services.iam.lib.policy import is_policy_public
class glue_data_catalogs_not_publicly_accessible(Check):
def execute(self):
findings = []
for data_catalog in glue_client.data_catalogs.values():
report = Check_Report_AWS(self.metadata())
report.region = data_catalog.region
report.resource_id = glue_client.audited_account
report.resource_arn = glue_client._get_data_catalog_arn_template(
data_catalog.region
)
report.status = "PASS"
report.status_extended = "Glue Data Catalog is not publicly accessible."
if is_policy_public(
data_catalog.policy,
glue_client.audited_account,
):
report.status = "FAIL"
report.status_extended = "Glue Data Catalog is publicly accessible due to its resource policy."
findings.append(report)
return findings

View File

@@ -1,3 +1,4 @@
import json
from typing import Optional
from botocore.exceptions import ClientError
@@ -17,8 +18,9 @@ class Glue(AWSService):
self.__threading_call__(self._list_tags, self.connections)
self.tables = []
self.__threading_call__(self._search_tables)
self.catalog_encryption_settings = []
self.__threading_call__(self._get_data_catalog_encryption_settings)
self.data_catalogs = {}
self.__threading_call__(self._get_data_catalogs)
self.__threading_call__(self._get_resource_policy, self.data_catalogs.values())
self.dev_endpoints = []
self.__threading_call__(self._get_dev_endpoints)
self.__threading_call__(self._list_tags, self.dev_endpoints)
@@ -181,8 +183,8 @@ class Glue(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_data_catalog_encryption_settings(self, regional_client):
logger.info("Glue - Catalog Encryption Settings...")
def _get_data_catalogs(self, regional_client):
logger.info("Glue - Catalog ...")
try:
settings = regional_client.get_data_catalog_encryption_settings()[
"DataCatalogEncryptionSettings"
@@ -191,19 +193,20 @@ class Glue(AWSService):
for table in self.tables:
if table.region == regional_client.region:
tables_in_region = True
self.catalog_encryption_settings.append(
CatalogEncryptionSetting(
mode=settings["EncryptionAtRest"]["CatalogEncryptionMode"],
kms_id=settings["EncryptionAtRest"].get("SseAwsKmsKeyId"),
password_encryption=settings["ConnectionPasswordEncryption"][
"ReturnConnectionPasswordEncrypted"
],
password_kms_id=settings["ConnectionPasswordEncryption"].get(
"AwsKmsKeyId"
),
region=regional_client.region,
tables=tables_in_region,
)
catalog_encryption_settings = CatalogEncryptionSetting(
mode=settings["EncryptionAtRest"]["CatalogEncryptionMode"],
kms_id=settings["EncryptionAtRest"].get("SseAwsKmsKeyId"),
password_encryption=settings["ConnectionPasswordEncryption"][
"ReturnConnectionPasswordEncrypted"
],
password_kms_id=settings["ConnectionPasswordEncryption"].get(
"AwsKmsKeyId"
),
)
self.data_catalogs[regional_client.region] = DataCatalog(
tables=tables_in_region,
region=regional_client.region,
encryption_settings=catalog_encryption_settings,
)
except Exception as error:
logger.error(
@@ -246,6 +249,27 @@ class Glue(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_resource_policy(self, data_catalog):
logger.info("Glue - Getting Resource Policy...")
try:
data_catalog_policy = self.regional_clients[
data_catalog.region
].get_resource_policy()
data_catalog.policy = json.loads(data_catalog_policy["PolicyInJson"])
except ClientError as error:
if error.response["Error"]["Code"] == "EntityNotFoundException":
logger.warning(
f"{data_catalog.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{data_catalog.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{data_catalog.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Connection(BaseModel):
name: str
@@ -269,8 +293,6 @@ class CatalogEncryptionSetting(BaseModel):
kms_id: Optional[str]
password_encryption: bool
password_kms_id: Optional[str]
tables: bool
region: str
class DevEndpoint(BaseModel):
@@ -308,3 +330,10 @@ class MLTransform(BaseModel):
user_data_encryption: str
region: str
tags: Optional[list]
class DataCatalog(BaseModel):
tables: bool
region: str
encryption_settings: Optional[CatalogEncryptionSetting]
policy: Optional[dict]

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "secretsmanager_not_publicly_accessible",
"CheckTitle": "Ensure Secrets Manager secrets are not publicly accessible.",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "secretsmanager",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:secretsmanager:region:account-id:secret:secret-name",
"Severity": "high",
"ResourceType": "AwsSecretsManagerSecret",
"Description": "This control checks whether Secrets Manager secrets are not publicly accessible via resource policies.",
"Risk": "Publicly accessible secrets can expose sensitive information and pose a security risk.",
"RelatedUrl": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/auth-and-access_resource-policies.html",
"Remediation": {
"Code": {
"CLI": "aws secretsmanager delete-resource-policy --secret-id <secret-id>",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Review and remove any public access from Secrets Manager policies to follow the Principle of Least Privilege.",
"Url": "https://docs.aws.amazon.com/secretsmanager/latest/userguide/determine-acccess_examine-iam-policies.html"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,30 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.iam.lib.policy import is_policy_public
from prowler.providers.aws.services.secretsmanager.secretsmanager_client import (
secretsmanager_client,
)
class secretsmanager_not_publicly_accessible(Check):
def execute(self):
findings = []
for secret in secretsmanager_client.secrets.values():
report = Check_Report_AWS(self.metadata())
report.region = secret.region
report.resource_id = secret.name
report.resource_arn = secret.arn
report.resource_tags = secret.tags
report.status = "PASS"
report.status_extended = (
f"SecretsManager secret {secret.name} is not publicly accessible."
)
if is_policy_public(
secret.policy,
secretsmanager_client.audited_account,
):
report.status = "FAIL"
report.status_extended = f"SecretsManager secret {secret.name} is publicly accessible due to its resource policy."
findings.append(report)
return findings

View File

@@ -1,3 +1,4 @@
import json
from datetime import datetime, timezone
from typing import Optional
@@ -14,6 +15,7 @@ class SecretsManager(AWSService):
super().__init__(__class__.__name__, provider)
self.secrets = {}
self.__threading_call__(self._list_secrets)
self.__threading_call__(self._get_resource_policy, self.secrets.values())
def _list_secrets(self, regional_client):
logger.info("SecretsManager - Listing Secrets...")
@@ -46,11 +48,27 @@ class SecretsManager(AWSService):
f" {error}"
)
def _get_resource_policy(self, secret):
logger.info("SecretsManager - Getting Resource Policy...")
try:
secret_policy = self.regional_clients[secret.region].get_resource_policy(
SecretId=secret.arn
)
if secret_policy.get("ResourcePolicy"):
secret.policy = json.loads(secret_policy["ResourcePolicy"])
except Exception as error:
logger.error(
f"{self.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
class Secret(BaseModel):
arn: str
name: str
region: str
policy: Optional[dict] = None
rotation_enabled: bool = False
last_accessed_date: datetime
tags: Optional[list] = []

View File

@@ -0,0 +1,4 @@
from prowler.providers.aws.services.ses.ses_service import SES
from prowler.providers.common.provider import Provider
ses_client = SES(Provider.get_global_provider())

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "ses_identity_not_publicly_accessible",
"CheckTitle": "Ensure that SES identities are not publicly accessible",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "ses",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:ses:region:account-id:identity/<IDENTITY-NAME>",
"Severity": "high",
"ResourceType": "AwsSesIdentity",
"Description": "This control checks whether SES identities are not publicly accessible via resource policies.",
"Risk": "Publicly accessible SES identities can allow unauthorized email sending or receiving, leading to potential abuse or phishing attacks.",
"RelatedUrl": "https://docs.aws.amazon.com/ses/latest/dg/identity-authorization-policies.html",
"Remediation": {
"Code": {
"CLI": "aws ses delete-email-identity-policy --identity <IDENTITY-NAME> --policy-name <POLICY-NAME>",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Review and restrict SES identity policies to prevent public access. Ensure policies follow the Principle of Least Privilege.",
"Url": "https://docs.aws.amazon.com/ses/latest/dg/policy-anatomy.html"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,28 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.iam.lib.policy import is_policy_public
from prowler.providers.aws.services.ses.ses_client import ses_client
class ses_identity_not_publicly_accessible(Check):
def execute(self):
findings = []
for identity in ses_client.email_identities.values():
report = Check_Report_AWS(self.metadata())
report.region = identity.region
report.resource_id = identity.name
report.resource_arn = identity.arn
report.resource_tags = identity.tags
report.status = "PASS"
report.status_extended = (
f"SES identity {identity.name} is not publicly accessible."
)
if is_policy_public(
identity.policy,
ses_client.audited_account,
):
report.status = "FAIL"
report.status_extended = f"SES identity {identity.name} is publicly accessible due to its resource policy."
findings.append(report)
return findings

View File

@@ -0,0 +1,69 @@
from json import loads
from typing import Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
class SES(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__("sesv2", provider)
self.email_identities = {}
self.__threading_call__(self._list_email_identities)
self.__threading_call__(
self._get_email_identities, self.email_identities.values()
)
def _list_email_identities(self, regional_client):
logger.info("SES - describing identities...")
try:
response = regional_client.list_email_identities()
for email_identity in response["EmailIdentities"]:
identity_arn = f"arn:{self.audited_partition}:ses:{regional_client.region}:{self.audited_account}:identity/{email_identity['IdentityName']}"
if not self.audit_resources or (
is_resource_filtered(identity_arn, self.audit_resources)
):
self.email_identities[identity_arn] = Identity(
arn=identity_arn,
type=email_identity["IdentityType"],
name=email_identity["IdentityName"],
region=regional_client.region,
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_email_identities(self, identity):
try:
logger.info("SES - describing email identities ...")
try:
regional_client = self.regional_clients[identity.region]
identity_attributes = regional_client.get_email_identity(
EmailIdentity=identity.name
)
for _, content in identity_attributes["Policies"].items():
identity.policy = loads(content)
identity.tags = identity_attributes["Tags"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Identity(BaseModel):
name: str
arn: str
region: str
type: Optional[str]
policy: Optional[dict] = None
tags: Optional[list] = []

View File

@@ -1,26 +1,92 @@
from unittest.mock import MagicMock, patch
from unittest import mock
from prowler.providers.aws.services.glue.glue_service import CatalogEncryptionSetting
import botocore
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.glue.glue_service import Glue
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_COMMERCIAL_PARTITION,
AWS_REGION_US_EAST_1,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "GetDataCatalogEncryptionSettings":
return {
"DataCatalogEncryptionSettings": {
"EncryptionAtRest": {
"CatalogEncryptionMode": "DISABLED",
"SseAwsKmsKeyId": "kms-key",
},
"ConnectionPasswordEncryption": {
"ReturnConnectionPasswordEncrypted": False,
"AwsKmsKeyId": "password_key",
},
}
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_v2(self, operation_name, kwarg):
if operation_name == "GetDataCatalogEncryptionSettings":
return {
"DataCatalogEncryptionSettings": {
"EncryptionAtRest": {
"CatalogEncryptionMode": "DISABLED",
"SseAwsKmsKeyId": "kms-key",
},
"ConnectionPasswordEncryption": {
"ReturnConnectionPasswordEncrypted": False,
"AwsKmsKeyId": "password_key",
},
}
}
elif operation_name == "SearchTables":
return {
"TableList": [
{
"Name": "test-table",
"DatabaseName": "test-database",
"CatalogId": AWS_ACCOUNT_NUMBER,
}
],
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_v3(self, operation_name, kwarg):
if operation_name == "GetDataCatalogEncryptionSettings":
return {
"DataCatalogEncryptionSettings": {
"EncryptionAtRest": {
"CatalogEncryptionMode": "SSE-KMS",
"SseAwsKmsKeyId": "kms-key",
},
"ConnectionPasswordEncryption": {
"ReturnConnectionPasswordEncrypted": True,
"AwsKmsKeyId": "kms-key",
},
}
}
return make_api_call(self, operation_name, kwarg)
class Test_glue_data_catalogs_connection_passwords_encryption_enabled:
def test_glue_no_settings(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = []
@mock_aws
def test_glue_no_data_catalogs(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled import (
@@ -32,32 +98,18 @@ class Test_glue_data_catalogs_connection_passwords_encryption_enabled:
assert len(result) == 0
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_glue_catalog_password_unencrypted(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = [
CatalogEncryptionSetting(
mode="DISABLED",
tables=False,
kms_id=None,
region=AWS_REGION_US_EAST_1,
password_encryption=False,
password_kms_id=None,
)
]
glue_client.audited_account = AWS_ACCOUNT_NUMBER
glue_client.audited_partition = AWS_COMMERCIAL_PARTITION
glue_client.region = AWS_REGION_US_EAST_1
glue_client.data_catalog_arn_template = f"arn:{glue_client.audited_partition}:glue:{glue_client.region}:{glue_client.audited_account}:data-catalog"
glue_client._get_data_catalog_arn_template = MagicMock(
return_value=glue_client.data_catalog_arn_template
)
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled import (
@@ -74,36 +126,25 @@ class Test_glue_data_catalogs_connection_passwords_encryption_enabled:
== "Glue data catalog connection password is not encrypted."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == glue_client.data_catalog_arn_template
assert result[0].region == AWS_REGION_US_EAST_1
def test_glue_catalog_password_unencrypted_ignoring(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = [
CatalogEncryptionSetting(
mode="DISABLED",
tables=False,
kms_id=None,
region=AWS_REGION_US_EAST_1,
password_encryption=False,
password_kms_id=None,
assert (
result[0].resource_arn
== f"arn:aws:glue:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:data-catalog"
)
]
glue_client.audited_account = AWS_ACCOUNT_NUMBER
glue_client.audited_partition = AWS_COMMERCIAL_PARTITION
glue_client.region = AWS_REGION_US_EAST_1
glue_client.data_catalog_arn_template = f"arn:{glue_client.audited_partition}:glue:{glue_client.region}:{glue_client.audited_account}:data-catalog"
glue_client._get_data_catalog_arn_template = MagicMock(
return_value=glue_client.data_catalog_arn_template
)
glue_client.provider._scan_unused_services = False
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_glue_catalog_password_unencrypted_ignoring(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
aws_provider._scan_unused_services = False
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled import (
@@ -115,33 +156,19 @@ class Test_glue_data_catalogs_connection_passwords_encryption_enabled:
assert len(result) == 0
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call_v2)
def test_glue_catalog_password_unencrypted_ignoring_with_tables(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = [
CatalogEncryptionSetting(
mode="DISABLED",
tables=True,
kms_id=None,
region=AWS_REGION_US_EAST_1,
password_encryption=False,
password_kms_id=None,
)
]
glue_client.audited_partition = AWS_COMMERCIAL_PARTITION
glue_client.region = AWS_REGION_US_EAST_1
glue_client.audited_account = AWS_ACCOUNT_NUMBER
glue_client.data_catalog_arn_template = f"arn:{glue_client.audited_partition}:glue:{glue_client.region}:{glue_client.audited_account}:data-catalog"
glue_client._get_data_catalog_arn_template = MagicMock(
return_value=glue_client.data_catalog_arn_template
)
glue_client.provider._scan_unused_services = False
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
aws_provider._scan_unused_services = False
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled import (
@@ -158,35 +185,24 @@ class Test_glue_data_catalogs_connection_passwords_encryption_enabled:
== "Glue data catalog connection password is not encrypted."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == glue_client.data_catalog_arn_template
assert result[0].region == AWS_REGION_US_EAST_1
def test_glue_catalog_encrypted(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = [
CatalogEncryptionSetting(
mode="DISABLED",
tables=False,
region=AWS_REGION_US_EAST_1,
password_encryption=True,
password_kms_id="kms-key",
assert (
result[0].resource_arn
== f"arn:aws:glue:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:data-catalog"
)
]
glue_client.audited_partition = AWS_COMMERCIAL_PARTITION
glue_client.region = AWS_REGION_US_EAST_1
glue_client.audited_account = AWS_ACCOUNT_NUMBER
glue_client.data_catalog_arn_template = f"arn:{glue_client.audited_partition}:glue:{glue_client.region}:{glue_client.audited_account}:data-catalog"
glue_client._get_data_catalog_arn_template = MagicMock(
return_value=glue_client.data_catalog_arn_template
)
assert result[0].region == AWS_REGION_EU_WEST_1
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call_v3)
def test_glue_catalog_encrypted(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_connection_passwords_encryption_enabled.glue_data_catalogs_connection_passwords_encryption_enabled import (
@@ -203,5 +219,8 @@ class Test_glue_data_catalogs_connection_passwords_encryption_enabled:
== "Glue data catalog connection password is encrypted with KMS key kms-key."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == glue_client.data_catalog_arn_template
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:glue:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:data-catalog"
)
assert result[0].region == AWS_REGION_EU_WEST_1

View File

@@ -1,27 +1,92 @@
from re import search
from unittest.mock import MagicMock, patch
from unittest import mock
from prowler.providers.aws.services.glue.glue_service import CatalogEncryptionSetting
import botocore
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.glue.glue_service import Glue
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_COMMERCIAL_PARTITION,
AWS_REGION_US_EAST_1,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "GetDataCatalogEncryptionSettings":
return {
"DataCatalogEncryptionSettings": {
"EncryptionAtRest": {
"CatalogEncryptionMode": "DISABLED",
"SseAwsKmsKeyId": "kms-key",
},
"ConnectionPasswordEncryption": {
"ReturnConnectionPasswordEncrypted": True,
"AwsKmsKeyId": "password_key",
},
}
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_v2(self, operation_name, kwarg):
if operation_name == "GetDataCatalogEncryptionSettings":
return {
"DataCatalogEncryptionSettings": {
"EncryptionAtRest": {
"CatalogEncryptionMode": "DISABLED",
"SseAwsKmsKeyId": "kms-key",
},
"ConnectionPasswordEncryption": {
"ReturnConnectionPasswordEncrypted": True,
"AwsKmsKeyId": "password_key",
},
}
}
elif operation_name == "SearchTables":
return {
"TableList": [
{
"Name": "test-table",
"DatabaseName": "test-database",
"CatalogId": AWS_ACCOUNT_NUMBER,
}
],
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_v3(self, operation_name, kwarg):
if operation_name == "GetDataCatalogEncryptionSettings":
return {
"DataCatalogEncryptionSettings": {
"EncryptionAtRest": {
"CatalogEncryptionMode": "SSE-KMS",
"SseAwsKmsKeyId": "kms-key",
},
"ConnectionPasswordEncryption": {
"ReturnConnectionPasswordEncrypted": True,
"AwsKmsKeyId": "password_key",
},
}
}
return make_api_call(self, operation_name, kwarg)
class Test_glue_data_catalogs_metadata_encryption_enabled:
def test_glue_no_settings(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider
glue_client.catalog_encryption_settings = []
@mock_aws
def test_glue_no_data_catalogs(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled import (
@@ -33,32 +98,18 @@ class Test_glue_data_catalogs_metadata_encryption_enabled:
assert len(result) == 0
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_glue_catalog_unencrypted(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = [
CatalogEncryptionSetting(
mode="disabled.",
tables=False,
kms_id=None,
region=AWS_REGION_US_EAST_1,
password_encryption=False,
password_kms_id=None,
)
]
glue_client.region = AWS_REGION_US_EAST_1
glue_client.audited_account = AWS_ACCOUNT_NUMBER
glue_client.audited_partition = AWS_COMMERCIAL_PARTITION
glue_client.data_catalog_arn_template = f"arn:{glue_client.audited_partition}:glue:{glue_client.region}:{glue_client.audited_account}:data-catalog"
glue_client._get_data_catalog_arn_template = MagicMock(
return_value=glue_client.data_catalog_arn_template
)
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled import (
@@ -75,36 +126,24 @@ class Test_glue_data_catalogs_metadata_encryption_enabled:
== "Glue data catalog settings have metadata encryption disabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == glue_client.data_catalog_arn_template
assert result[0].region == AWS_REGION_US_EAST_1
def test_glue_catalog_unencrypted_ignoring(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = [
CatalogEncryptionSetting(
mode="disabled.",
tables=False,
kms_id=None,
region=AWS_REGION_US_EAST_1,
password_encryption=False,
password_kms_id=None,
assert (
result[0].resource_arn
== f"arn:aws:glue:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:data-catalog"
)
]
glue_client.provider._scan_unused_services = False
glue_client.region = AWS_REGION_US_EAST_1
glue_client.audited_account = AWS_ACCOUNT_NUMBER
glue_client.audited_partition = AWS_COMMERCIAL_PARTITION
glue_client.data_catalog_arn_template = f"arn:{glue_client.audited_partition}:glue:{glue_client.region}:{glue_client.audited_account}:data-catalog"
glue_client._get_data_catalog_arn_template = MagicMock(
return_value=glue_client.data_catalog_arn_template
)
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_glue_catalog_unencrypted_ignoring(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
aws_provider._scan_unused_services = False
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled import (
@@ -116,33 +155,18 @@ class Test_glue_data_catalogs_metadata_encryption_enabled:
assert len(result) == 0
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call_v2)
def test_glue_catalog_unencrypted_ignoring_with_tables(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = [
CatalogEncryptionSetting(
mode="disabled.",
tables=True,
kms_id=None,
region=AWS_REGION_US_EAST_1,
password_encryption=False,
password_kms_id=None,
)
]
glue_client.provider._scan_unused_services = False
glue_client.region = AWS_REGION_US_EAST_1
glue_client.audited_account = AWS_ACCOUNT_NUMBER
glue_client.audited_partition = AWS_COMMERCIAL_PARTITION
glue_client.data_catalog_arn_template = f"arn:{glue_client.audited_partition}:glue:{glue_client.region}:{glue_client.audited_account}:data-catalog"
glue_client._get_data_catalog_arn_template = MagicMock(
return_value=glue_client.data_catalog_arn_template
)
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
aws_provider._scan_unused_services = False
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled import (
@@ -154,40 +178,29 @@ class Test_glue_data_catalogs_metadata_encryption_enabled:
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"Glue data catalog settings have metadata encryption disabled.",
result[0].status_extended,
assert (
result[0].status_extended
== "Glue data catalog settings have metadata encryption disabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == glue_client.data_catalog_arn_template
assert result[0].region == AWS_REGION_US_EAST_1
def test_glue_catalog_encrypted(self):
glue_client = MagicMock
glue_client.provider = set_mocked_aws_provider()
glue_client.catalog_encryption_settings = [
CatalogEncryptionSetting(
mode="SSE-KMS",
kms_id="kms-key",
tables=False,
region=AWS_REGION_US_EAST_1,
password_encryption=False,
password_kms_id=None,
assert (
result[0].resource_arn
== f"arn:aws:glue:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:data-catalog"
)
]
glue_client.region = AWS_REGION_US_EAST_1
glue_client.audited_account = AWS_ACCOUNT_NUMBER
glue_client.audited_partition = AWS_COMMERCIAL_PARTITION
glue_client.data_catalog_arn_template = f"arn:{glue_client.audited_partition}:glue:{glue_client.region}:{glue_client.audited_account}:data-catalog"
glue_client._get_data_catalog_arn_template = MagicMock(
return_value=glue_client.data_catalog_arn_template
)
with patch(
"prowler.providers.aws.services.glue.glue_service.Glue",
new=glue_client,
), patch(
"prowler.providers.aws.services.glue.glue_client.glue_client",
new=glue_client,
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call_v3)
def test_glue_catalog_encrypted(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_metadata_encryption_enabled.glue_data_catalogs_metadata_encryption_enabled import (
@@ -204,5 +217,8 @@ class Test_glue_data_catalogs_metadata_encryption_enabled:
== "Glue data catalog settings have metadata encryption enabled with KMS key kms-key."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == glue_client.data_catalog_arn_template
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:glue:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:data-catalog"
)
assert result[0].region == AWS_REGION_EU_WEST_1

View File

@@ -0,0 +1,146 @@
from unittest import mock
import botocore
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.glue.glue_service import Glue
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "GetDataCatalogEncryptionSettings":
return {
"DataCatalogEncryptionSettings": {
"EncryptionAtRest": {
"CatalogEncryptionMode": "SSE-KMS",
"SseAwsKmsKeyId": "kms_key",
},
"ConnectionPasswordEncryption": {
"ReturnConnectionPasswordEncrypted": True,
"AwsKmsKeyId": "password_key",
},
}
}
elif operation_name == "GetResourcePolicy":
return {
"PolicyInJson": '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"arn:aws:iam::123456789012:root","Action":"secretsmanager:GetSecretValue","Resource":"*"}]}',
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_v2(self, operation_name, kwarg):
if operation_name == "GetDataCatalogEncryptionSettings":
return {
"DataCatalogEncryptionSettings": {
"EncryptionAtRest": {
"CatalogEncryptionMode": "SSE-KMS",
"SseAwsKmsKeyId": "kms_key",
},
"ConnectionPasswordEncryption": {
"ReturnConnectionPasswordEncrypted": True,
"AwsKmsKeyId": "password_key",
},
}
}
elif operation_name == "GetResourcePolicy":
return {
"PolicyInJson": '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"*","Action":"glue:*","Resource":"*"}]}',
}
return make_api_call(self, operation_name, kwarg)
class Test_glue_data_catalogs_not_publicly_accessible:
@mock_aws
def test_glue_no_data_catalogs(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_not_publicly_accessible.glue_data_catalogs_not_publicly_accessible.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_not_publicly_accessible.glue_data_catalogs_not_publicly_accessible import (
glue_data_catalogs_not_publicly_accessible,
)
check = glue_data_catalogs_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_glue_data_catalog_not_public_policy(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_not_publicly_accessible.glue_data_catalogs_not_publicly_accessible.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_not_publicly_accessible.glue_data_catalogs_not_publicly_accessible import (
glue_data_catalogs_not_publicly_accessible,
)
check = glue_data_catalogs_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Glue Data Catalog is not publicly accessible."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert (
result[0].resource_arn
== f"arn:aws:glue:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:data-catalog"
)
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call_v2)
def test_glue_data_catalog_public_policy(self):
client("glue", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.glue.glue_data_catalogs_not_publicly_accessible.glue_data_catalogs_not_publicly_accessible.glue_client",
new=Glue(aws_provider),
):
# Test Check
from prowler.providers.aws.services.glue.glue_data_catalogs_not_publicly_accessible.glue_data_catalogs_not_publicly_accessible import (
glue_data_catalogs_not_publicly_accessible,
)
check = glue_data_catalogs_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Glue Data Catalog is publicly accessible due to its resource policy."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert (
result[0].resource_arn
== f"arn:aws:glue:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:data-catalog"
)
assert result[0].region == AWS_REGION_EU_WEST_1

View File

@@ -116,6 +116,10 @@ def mock_make_api_call(self, operation_name, kwarg):
"test_key": "test_value",
},
}
elif operation_name == "GetResourcePolicy":
return {
"PolicyInJson": '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"*","Action":"secretsmanager:GetSecretValue","Resource":"*"}]}',
}
return make_api_call(self, operation_name, kwarg)
@@ -201,12 +205,22 @@ class Test_Glue_Service:
def test_get_data_catalog_encryption_settings(self):
aws_provider = set_mocked_aws_provider()
glue = Glue(aws_provider)
assert len(glue.catalog_encryption_settings) == 1
assert glue.catalog_encryption_settings[0].mode == "SSE-KMS"
assert glue.catalog_encryption_settings[0].kms_id == "kms_key"
assert glue.catalog_encryption_settings[0].password_encryption
assert glue.catalog_encryption_settings[0].password_kms_id == "password_key"
assert glue.catalog_encryption_settings[0].region == AWS_REGION_US_EAST_1
assert glue.data_catalogs[AWS_REGION_US_EAST_1].encryption_settings
assert (
glue.data_catalogs[AWS_REGION_US_EAST_1].encryption_settings.mode
== "SSE-KMS"
)
assert (
glue.data_catalogs[AWS_REGION_US_EAST_1].encryption_settings.kms_id
== "kms_key"
)
assert glue.data_catalogs[
AWS_REGION_US_EAST_1
].encryption_settings.password_encryption
assert (
glue.data_catalogs[AWS_REGION_US_EAST_1].encryption_settings.password_kms_id
== "password_key"
)
# Test Glue Get Dev Endpoints
@mock_aws
@@ -265,3 +279,19 @@ class Test_Glue_Service:
assert glue.dev_endpoints[0].tags == [{"test_key": "test_value"}]
assert glue.jobs[0].tags == [{"test_key": "test_value"}]
@mock_aws
def test_get_resource_policy(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
glue = Glue(aws_provider)
assert glue.data_catalogs[AWS_REGION_US_EAST_1].policy == {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "secretsmanager:GetSecretValue",
"Resource": "*",
}
],
}

View File

@@ -0,0 +1,113 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.secretsmanager.secretsmanager_service import (
SecretsManager,
)
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider
class Test_secretsmanager_not_publicly_accessible:
def test_no_secrets(self):
client("secretsmanager", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.secretsmanager.secretsmanager_not_publicly_accessible.secretsmanager_not_publicly_accessible.secretsmanager_client",
new=SecretsManager(aws_provider),
):
# Test Check
from prowler.providers.aws.services.secretsmanager.secretsmanager_not_publicly_accessible.secretsmanager_not_publicly_accessible import (
secretsmanager_not_publicly_accessible,
)
check = secretsmanager_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_secret_not_public_policy(self):
secretsmanager_client = client(
"secretsmanager", region_name=AWS_REGION_EU_WEST_1
)
secret = secretsmanager_client.create_secret(
Name="test-secret-no-public-policy",
)
secretsmanager_client.put_resource_policy(
SecretId=secret["ARN"],
ResourcePolicy='{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"arn:aws:iam::123456789012:root","Action":"secretsmanager:GetSecretValue","Resource":"*"}]}',
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.secretsmanager.secretsmanager_not_publicly_accessible.secretsmanager_not_publicly_accessible.secretsmanager_client",
new=SecretsManager(aws_provider),
):
# Test Check
from prowler.providers.aws.services.secretsmanager.secretsmanager_not_publicly_accessible.secretsmanager_not_publicly_accessible import (
secretsmanager_not_publicly_accessible,
)
check = secretsmanager_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == secret["Name"]
assert result[0].resource_arn == secret["ARN"]
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SecretsManager secret {secret['Name']} is not publicly accessible."
)
@mock_aws
def test_secret_public_policy(self):
secretsmanager_client = client(
"secretsmanager", region_name=AWS_REGION_EU_WEST_1
)
secret = secretsmanager_client.create_secret(
Name="test-secret-public-policy",
)
secretsmanager_client.put_resource_policy(
SecretId=secret["ARN"],
ResourcePolicy='{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"*","Action":"secretsmanager:GetSecretValue","Resource":"*"}]}',
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.secretsmanager.secretsmanager_not_publicly_accessible.secretsmanager_not_publicly_accessible.secretsmanager_client",
new=SecretsManager(aws_provider),
):
# Test Check
from prowler.providers.aws.services.secretsmanager.secretsmanager_not_publicly_accessible.secretsmanager_not_publicly_accessible import (
secretsmanager_not_publicly_accessible,
)
check = secretsmanager_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == secret["Name"]
assert result[0].resource_arn == secret["ARN"]
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"SecretsManager secret {secret['Name']} is publicly accessible due to its resource policy."
)

View File

@@ -140,3 +140,34 @@ class Test_SecretsManager_Service:
assert secretsmanager.secrets[secret_arn].tags == [
{"Key": "test", "Value": "test"},
]
@mock_aws
def test_get_resource_policy(self):
secretsmanager_client = client(
"secretsmanager", region_name=AWS_REGION_EU_WEST_1
)
secret = secretsmanager_client.create_secret(
Name="test-secret-policy",
)
secretsmanager_client.put_resource_policy(
SecretId=secret["ARN"],
ResourcePolicy='{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"*","Action":"secretsmanager:GetSecretValue","Resource":"*"}]}',
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
secretsmanager = SecretsManager(aws_provider)
assert len(secretsmanager.secrets) == 1
assert secretsmanager.secrets[secret["ARN"]].name == "test-secret-policy"
assert secretsmanager.secrets[secret["ARN"]].arn == secret["ARN"]
assert secretsmanager.secrets[secret["ARN"]].region == AWS_REGION_EU_WEST_1
assert secretsmanager.secrets[secret["ARN"]].policy == {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "secretsmanager:GetSecretValue",
"Resource": "*",
}
],
}

View File

@@ -0,0 +1,142 @@
from unittest import mock
import botocore
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.ses.ses_service import SES
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListEmailIdentities":
return {
"EmailIdentities": [
{
"IdentityType": "DOMAIN",
"IdentityName": "test-email-identity-not-public",
}
],
}
elif operation_name == "GetEmailIdentity":
return {
"Policies": {
"policy1": '{"policy1": "value1"}',
},
"Tags": {"tag1": "value1", "tag2": "value2"},
}
return make_api_call(self, operation_name, kwarg)
def mock_make_api_call_v2(self, operation_name, kwarg):
if operation_name == "ListEmailIdentities":
return {
"EmailIdentities": [
{
"IdentityType": "EMAIL_ADDRESS",
"IdentityName": "test-email-identity-public",
}
],
}
elif operation_name == "GetEmailIdentity":
return {
"Policies": {
"policy1": '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":"*","Action":"ses:SendEmail","Resource":"*"}]}',
},
"Tags": {"tag1": "value1", "tag2": "value2"},
}
return make_api_call(self, operation_name, kwarg)
class Test_ses_identities_not_publicly_accessible:
@mock_aws
def test_no_identities(self):
client("sesv2", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ses.ses_identity_not_publicly_accessible.ses_identity_not_publicly_accessible.ses_client",
new=SES(aws_provider),
):
from prowler.providers.aws.services.ses.ses_identity_not_publicly_accessible.ses_identity_not_publicly_accessible import (
ses_identity_not_publicly_accessible,
)
check = ses_identity_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_email_identity_not_public(self):
client("sesv2", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ses.ses_identity_not_publicly_accessible.ses_identity_not_publicly_accessible.ses_client",
new=SES(aws_provider),
):
from prowler.providers.aws.services.ses.ses_identity_not_publicly_accessible.ses_identity_not_publicly_accessible import (
ses_identity_not_publicly_accessible,
)
check = ses_identity_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "SES identity test-email-identity-not-public is not publicly accessible."
)
assert result[0].resource_id == "test-email-identity-not-public"
assert (
result[0].resource_arn
== f"arn:aws:ses:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:identity/test-email-identity-not-public"
)
assert result[0].resource_tags == {"tag1": "value1", "tag2": "value2"}
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call_v2)
def test_email_identity_public(self):
client("sesv2", region_name=AWS_REGION_EU_WEST_1)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ses.ses_identity_not_publicly_accessible.ses_identity_not_publicly_accessible.ses_client",
new=SES(aws_provider),
):
from prowler.providers.aws.services.ses.ses_identity_not_publicly_accessible.ses_identity_not_publicly_accessible import (
ses_identity_not_publicly_accessible,
)
check = ses_identity_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "SES identity test-email-identity-public is publicly accessible due to its resource policy."
)
assert result[0].resource_id == "test-email-identity-public"
assert (
result[0].resource_arn
== f"arn:aws:ses:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:identity/test-email-identity-public"
)
assert result[0].resource_tags == {"tag1": "value1", "tag2": "value2"}
assert result[0].region == AWS_REGION_EU_WEST_1

View File

@@ -0,0 +1,80 @@
from unittest.mock import patch
import botocore
from moto import mock_aws
from prowler.providers.aws.services.ses.ses_service import SES
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListEmailIdentities":
return {
"EmailIdentities": [
{
"IdentityType": "EMAIL_ADDRESS",
"IdentityName": "test-email-identity",
}
],
}
elif operation_name == "GetEmailIdentity":
return {
"Policies": {
"policy1": '{"policy1": "value1"}',
},
"Tags": {"tag1": "value1", "tag2": "value2"},
}
return make_api_call(self, operation_name, kwarg)
def mock_generate_regional_clients(provider, service):
regional_client = provider._session.current_session.client(
service, region_name=AWS_REGION_EU_WEST_1
)
regional_client.region = AWS_REGION_EU_WEST_1
return {AWS_REGION_EU_WEST_1: regional_client}
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"prowler.providers.aws.aws_provider.AwsProvider.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_SES_Service:
# Test SES Service
def test_service(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ses = SES(aws_provider)
assert ses.service == "sesv2"
# Test SES client
def test_client(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ses = SES(aws_provider)
for reg_client in ses.regional_clients.values():
assert reg_client.__class__.__name__ == "SESV2"
# Test SES session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ses = SES(aws_provider)
assert ses.session.__class__.__name__ == "Session"
@mock_aws
# Test SES list queues
def test_list_identities(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ses = SES(aws_provider)
arn = f"arn:aws:ses:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:identity/test-email-identity"
assert ses.email_identities[arn].name == "test-email-identity"
assert ses.email_identities[arn].type == "EMAIL_ADDRESS"
assert ses.email_identities[arn].arn == arn
assert ses.email_identities[arn].region == AWS_REGION_EU_WEST_1
assert ses.email_identities[arn].policy == {"policy1": "value1"}
assert ses.email_identities[arn].tags == {"tag1": "value1", "tag2": "value2"}

View File

@@ -62,6 +62,8 @@ regions_by_service["services"]["wafv2"] = regions_by_service["services"]["waf"]
regions_by_service["services"]["wellarchitected"] = regions_by_service["services"][
"wellarchitectedtool"
]
# sesv2 --> ses
regions_by_service["services"]["sesv2"] = regions_by_service["services"]["ses"]
# Write to file
parsed_matrix_regions_aws = f"{os.path.dirname(os.path.realpath(__name__))}/prowler/providers/aws/aws_regions_by_service.json"