Compare commits

...

5 Commits

30 changed files with 1352 additions and 668 deletions

View File

@@ -13,142 +13,145 @@ class OpenSearchService(AWSService):
def __init__(self, provider): def __init__(self, provider):
# Call AWSService's __init__ # Call AWSService's __init__
super().__init__("opensearch", provider) super().__init__("opensearch", provider)
self.opensearch_domains = [] self.opensearch_domains = {}
self.__threading_call__(self._list_domain_names) self.__threading_call__(self._list_domain_names)
self._describe_domain_config(self.regional_clients) self.__threading_call__(
self._describe_domain(self.regional_clients) self._describe_domain_config, self.opensearch_domains.values()
self._list_tags() )
self.__threading_call__(self._describe_domain, self.opensearch_domains.values())
self.__threading_call__(self._list_tags, self.opensearch_domains.values())
def _list_domain_names(self, regional_client): def _list_domain_names(self, regional_client):
logger.info("OpenSearch - listing domain names...") logger.info("OpenSearch - listing domain names...")
try: try:
domains = regional_client.list_domain_names() domains = regional_client.list_domain_names()
for domain in domains["DomainNames"]: for domain in domains["DomainNames"]:
arn = f"arn:{self.audited_partition}:opensearch:{regional_client.region}:{self.audited_account}:domain/{domain['DomainName']}" arn = f"arn:{self.audited_partition}:es:{regional_client.region}:{self.audited_account}:domain/{domain['DomainName']}"
if not self.audit_resources or ( if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources) is_resource_filtered(arn, self.audit_resources)
): ):
self.opensearch_domains.append( self.opensearch_domains[arn] = OpenSearchDomain(
OpenSearchDomain( arn=arn,
arn=arn, name=domain["DomainName"],
name=domain["DomainName"], region=regional_client.region,
region=regional_client.region,
)
) )
except Exception as error: except Exception as error:
logger.error( logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
) )
def _describe_domain_config(self, regional_clients): def _describe_domain_config(self, domain):
logger.info("OpenSearch - describing domain configurations...") logger.info("OpenSearch - describing domain configurations...")
try: try:
for domain in self.opensearch_domains: regional_client = self.regional_clients[domain.region]
regional_client = regional_clients[domain.region] describe_domain = regional_client.describe_domain_config(
describe_domain = regional_client.describe_domain_config( DomainName=domain.name
DomainName=domain.name )
) for logging_key in [
for logging_key in [ "SEARCH_SLOW_LOGS",
"SEARCH_SLOW_LOGS", "INDEX_SLOW_LOGS",
"INDEX_SLOW_LOGS", "AUDIT_LOGS",
"AUDIT_LOGS", ]:
]: if logging_key in describe_domain["DomainConfig"].get(
if logging_key in describe_domain["DomainConfig"].get( "LogPublishingOptions", {}
"LogPublishingOptions", {} ).get("Options", {}):
).get("Options", {}): domain.logging.append(
domain.logging.append( PublishingLoggingOption(
PublishingLoggingOption( name=logging_key,
name=logging_key, enabled=describe_domain["DomainConfig"][
enabled=describe_domain["DomainConfig"][ "LogPublishingOptions"
"LogPublishingOptions" ]["Options"][logging_key]["Enabled"],
]["Options"][logging_key]["Enabled"],
)
) )
try:
domain.access_policy = loads(
describe_domain["DomainConfig"]["AccessPolicies"]["Options"]
) )
except JSONDecodeError as error:
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_domain(self, regional_clients):
logger.info("OpenSearch - describing domain configurations...")
try:
for domain in self.opensearch_domains:
regional_client = regional_clients[domain.region]
describe_domain = regional_client.describe_domain(
DomainName=domain.name
)
domain.arn = describe_domain["DomainStatus"]["ARN"]
domain.vpc_endpoints = None
if "Endpoints" in describe_domain["DomainStatus"]:
if "vpc" in describe_domain["DomainStatus"]["Endpoints"]:
domain.vpc_endpoints = [
vpc
for vpc in describe_domain["DomainStatus"][
"Endpoints"
].values()
]
domain.vpc_id = None
if "VPCOptions" in describe_domain["DomainStatus"]:
domain.vpc_id = describe_domain["DomainStatus"]["VPCOptions"][
"VPCId"
]
domain.cognito_options = describe_domain["DomainStatus"][
"CognitoOptions"
]["Enabled"]
domain.encryption_at_rest = describe_domain["DomainStatus"][
"EncryptionAtRestOptions"
]["Enabled"]
domain.node_to_node_encryption = describe_domain["DomainStatus"][
"NodeToNodeEncryptionOptions"
]["Enabled"]
domain.enforce_https = describe_domain["DomainStatus"][
"DomainEndpointOptions"
]["EnforceHTTPS"]
domain.internal_user_database = describe_domain["DomainStatus"][
"AdvancedSecurityOptions"
]["InternalUserDatabaseEnabled"]
domain.saml_enabled = (
describe_domain["DomainStatus"]["AdvancedSecurityOptions"]
.get("SAMLOptions", {})
.get("Enabled", False)
)
domain.update_available = describe_domain["DomainStatus"][
"ServiceSoftwareOptions"
]["UpdateAvailable"]
domain.version = describe_domain["DomainStatus"]["EngineVersion"]
domain.advanced_settings_enabled = describe_domain["DomainStatus"][
"AdvancedSecurityOptions"
]["Enabled"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_tags(self):
logger.info("OpenSearch - List Tags...")
for domain in self.opensearch_domains:
try: try:
regional_client = self.regional_clients[domain.region] domain.access_policy = loads(
response = regional_client.list_tags( describe_domain["DomainConfig"]["AccessPolicies"]["Options"]
ARN=domain.arn, )
)["TagList"] except JSONDecodeError as error:
domain.tags = response logger.warning(
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
) )
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_domain(self, domain):
logger.info("OpenSearch - describing domain configurations...")
try:
regional_client = self.regional_clients[domain.region]
describe_domain = regional_client.describe_domain(DomainName=domain.name)
domain.arn = describe_domain["DomainStatus"]["ARN"]
domain.vpc_endpoints = None
if "Endpoints" in describe_domain["DomainStatus"]:
if "vpc" in describe_domain["DomainStatus"]["Endpoints"]:
domain.vpc_endpoints = [
vpc
for vpc in describe_domain["DomainStatus"]["Endpoints"].values()
]
domain.vpc_id = None
if "VPCOptions" in describe_domain["DomainStatus"]:
domain.vpc_id = describe_domain["DomainStatus"]["VPCOptions"].get(
"VPCId", None
)
domain.cognito_options = describe_domain["DomainStatus"][
"CognitoOptions"
].get("Enabled", False)
domain.encryption_at_rest = describe_domain["DomainStatus"][
"EncryptionAtRestOptions"
].get("Enabled", False)
domain.node_to_node_encryption = describe_domain["DomainStatus"][
"NodeToNodeEncryptionOptions"
].get("Enabled", False)
domain.enforce_https = describe_domain["DomainStatus"][
"DomainEndpointOptions"
].get("EnforceHTTPS", False)
domain.internal_user_database = describe_domain["DomainStatus"][
"AdvancedSecurityOptions"
].get("InternalUserDatabaseEnabled", False)
domain.saml_enabled = (
describe_domain["DomainStatus"]["AdvancedSecurityOptions"]
.get("SAMLOptions", {})
.get("Enabled", False)
)
domain.update_available = (
describe_domain["DomainStatus"]
.get("ServiceSoftwareOptions", {"UpdateAvailable": False})
.get("UpdateAvailable", False)
)
domain.version = describe_domain["DomainStatus"].get("EngineVersion", None)
domain.advanced_settings_enabled = describe_domain["DomainStatus"][
"AdvancedSecurityOptions"
].get("Enabled", False)
domain.instance_count = describe_domain["DomainStatus"][
"ClusterConfig"
].get("InstanceCount", None)
domain.zone_awareness_enabled = describe_domain["DomainStatus"][
"ClusterConfig"
].get("ZoneAwarenessEnabled", False)
domain.dedicated_master_count = describe_domain["DomainStatus"][
"ClusterConfig"
].get("DedicatedMasterCount", None)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_tags(self, domain):
logger.info("OpenSearch - List Tags...")
try:
regional_client = self.regional_clients[domain.region]
response = regional_client.list_tags(
ARN=domain.arn,
)["TagList"]
domain.tags = response
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class PublishingLoggingOption(BaseModel): class PublishingLoggingOption(BaseModel):
name: str name: str
@@ -171,5 +174,8 @@ class OpenSearchDomain(BaseModel):
saml_enabled: bool = None saml_enabled: bool = None
update_available: bool = None update_available: bool = None
version: str = None version: str = None
instance_count: Optional[int]
zone_awareness_enabled: Optional[bool]
dedicated_master_count: Optional[int]
tags: Optional[list] = [] tags: Optional[list] = []
advanced_settings_enabled: bool = None advanced_settings_enabled: bool = None

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_access_control_enabled(Check): class opensearch_service_domains_access_control_enabled(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "opensearch_service_domains_at_least_three_master_nodes",
"CheckTitle": "Elasticsearch domains should be configured with at least three dedicated master nodes",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "elasticsearch",
"SubServiceName": "domain",
"ResourceIdTemplate": "arn:aws:es:{region}:{account-id}:domain/{domain-name}",
"Severity": "medium",
"ResourceType": "AwsElasticsearchDomain",
"Description": "This control checks whether Elasticsearch domains are configured with at least three dedicated primary nodes. This control fails if the domain does not use dedicated primary nodes. Using more than three primary nodes may not provide significant additional availability benefits, while incurring extra costs.",
"Risk": "Without at least three dedicated master nodes, the Elasticsearch domain's fault-tolerance and ability to handle cluster management operations during node failures may be compromised, leading to potential data unavailability.",
"RelatedUrl": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/what-is.html",
"Remediation": {
"Code": {
"CLI": "aws opensearch update-domain-config --domain-name <domain-name> --cluster-config DedicatedMasterEnabled=true,MasterInstanceCount=3",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/es-controls.html#es-7",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure Elasticsearch domains with at least three dedicated master nodes for high availability and cluster fault tolerance.",
"Url": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-configuration-changes.html"
}
},
"Categories": [
"redundancy"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,27 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.opensearch.opensearch_client import (
opensearch_client,
)
class opensearch_service_domains_at_least_three_master_nodes(Check):
def execute(self):
findings = []
for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata())
report.region = domain.region
report.resource_id = domain.name
report.resource_arn = domain.arn
report.resource_tags = domain.tags
report.status = "FAIL"
report.status_extended = f"Opensearch domain {domain.name} has only {domain.dedicated_master_count} master nodes."
if domain.dedicated_master_count >= 3:
report.status = "PASS"
report.status_extended = f"Opensearch domain {domain.name} has {domain.dedicated_master_count} master nodes."
findings.append(report)
return findings

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled",
"CheckTitle": "Elasticsearch/Opensearch domains should have at least three data nodes",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "opensearch",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:{region}:{account-id}:domain/{domain-name}",
"Severity": "medium",
"ResourceType": "AwsElasticsearchDomain",
"Description": "This control checks whether Elasticsearch/Opensearch domains are configured with at least three data nodes and zoneAwarenessEnabled is true.",
"Risk": "Without at least three data nodes, the Elasticsearch/Opensearch domain may not be fault-tolerant, leading to a higher risk of data loss or unavailability in case of node failure.",
"RelatedUrl": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/what-is.html",
"Remediation": {
"Code": {
"CLI": "aws opensearch update-domain-config --domain-name <domain-name> --cluster-config InstanceCount=3,ZoneAwarenessEnabled=true",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/es-controls.html#es-6",
"Terraform": ""
},
"Recommendation": {
"Text": "Modify the Elasticsearch/Opensearch domain to ensure at least three data nodes and enable zone awareness for high availability.",
"Url": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-configuration-changes.html"
}
},
"Categories": [
"redundancy"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,33 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.opensearch.opensearch_client import (
opensearch_client,
)
class opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled(Check):
def execute(self):
findings = []
for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata())
report.region = domain.region
report.resource_id = domain.name
report.resource_arn = domain.arn
report.resource_tags = domain.tags
report.status = "FAIL"
report.status_extended = f"Opensearch domain {domain.name} does not meet the requirements: it has less than 3 data nodes and zone awareness is not enabled."
if domain.instance_count >= 3 and domain.zone_awareness_enabled:
report.status = "PASS"
report.status_extended = f"Opensearch domain {domain.name} has {domain.instance_count} data nodes and zone awareness enabled."
elif domain.instance_count >= 3 and not domain.zone_awareness_enabled:
report.status = "FAIL"
report.status_extended = f"Opensearch domain {domain.name} has {domain.instance_count} data nodes, but zone awareness is not enabled."
elif domain.instance_count < 3 and domain.zone_awareness_enabled:
report.status = "FAIL"
report.status_extended = f"Opensearch domain {domain.name} has zone awareness enabled, but only {domain.instance_count} data nodes."
findings.append(report)
return findings

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_audit_logging_enabled(Check): class opensearch_service_domains_audit_logging_enabled(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_cloudwatch_logging_enabled(Check): class opensearch_service_domains_cloudwatch_logging_enabled(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_encryption_at_rest_enabled(Check): class opensearch_service_domains_encryption_at_rest_enabled(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_https_communications_enforced(Check): class opensearch_service_domains_https_communications_enforced(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_internal_user_database_enabled(Check): class opensearch_service_domains_internal_user_database_enabled(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_node_to_node_encryption_enabled(Check): class opensearch_service_domains_node_to_node_encryption_enabled(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_not_publicly_accessible(Check): class opensearch_service_domains_not_publicly_accessible(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -7,14 +7,14 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_updated_to_the_latest_service_software_version(Check): class opensearch_service_domains_updated_to_the_latest_service_software_version(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name
report.resource_arn = domain.arn report.resource_arn = domain.arn
report.resource_tags = domain.tags report.resource_tags = domain.tags
report.status = "PASS" report.status = "PASS"
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} does not have internal updates available." report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} does not have internal updates available."
if domain.update_available: if domain.update_available:
report.status = "FAIL" report.status = "FAIL"
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} has internal updates available." report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} has internal updates available."

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_use_cognito_authentication_for_kibana(Check): class opensearch_service_domains_use_cognito_authentication_for_kibana(Check):
def execute(self): def execute(self):
findings = [] findings = []
for domain in opensearch_client.opensearch_domains: for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata()) report = Check_Report_AWS(self.metadata())
report.region = domain.region report.region = domain.region
report.resource_id = domain.name report.resource_id = domain.name

View File

@@ -1,26 +1,32 @@
from re import search
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( from boto3 import client
OpenSearchDomain, from moto import mock_aws
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_access_control_enabled: class Test_opensearch_service_domains_access_control_enabled:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
opensearch_service_domains_access_control_enabled, opensearch_service_domains_access_control_enabled,
@@ -30,22 +36,26 @@ class Test_opensearch_service_domains_access_control_enabled:
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
@mock_aws
def test_no_fine_grained_access_enabled(self): def test_no_fine_grained_access_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain",
OpenSearchDomain( AdvancedSecurityOptions={"Enabled": False},
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
)
) )
opensearch_client.opensearch_domains[0].advanced_settings_enabled = False
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
opensearch_service_domains_access_control_enabled, opensearch_service_domains_access_control_enabled,
@@ -55,29 +65,36 @@ class Test_opensearch_service_domains_access_control_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"does not have fine grained access control enabled.", f"Opensearch domain {domain["DomainStatus"]["DomainName"]} does not have fine grained access control enabled."
result[0].status_extended, == result[0].status_extended
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
def test_logging_AUDIT_LOGS_enabled(self): @mock_aws
opensearch_client = mock.MagicMock def test_fine_grained_access_enabled(self):
opensearch_client.opensearch_domains = [] opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains.append( domain = opensearch_client.create_domain(
OpenSearchDomain( DomainName="test-domain",
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn AdvancedSecurityOptions={"Enabled": True},
)
) )
opensearch_client.opensearch_domains[0].advanced_settings_enabled = True
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
opensearch_service_domains_access_control_enabled, opensearch_service_domains_access_control_enabled,
@@ -87,8 +104,12 @@ class Test_opensearch_service_domains_access_control_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search( assert (
"has fine grained access control enabled.", result[0].status_extended f"Opensearch domain {domain["DomainStatus"]["DomainName"]} has fine grained access control enabled."
== result[0].status_extended
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn

View File

@@ -0,0 +1,115 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_at_least_three_master_nodes:
@mock_aws
def test_no_domains(self):
client("opensearch", region_name=AWS_REGION_US_EAST_1)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
opensearch_service_domains_at_least_three_master_nodes,
)
check = opensearch_service_domains_at_least_three_master_nodes()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_less_than_three_master_nodes(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain-2-nodes",
ClusterConfig={"DedicatedMasterCount": 2},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
opensearch_service_domains_at_least_three_master_nodes,
)
check = opensearch_service_domains_at_least_three_master_nodes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Opensearch domain test-domain-2-nodes has only 2 master nodes."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)
@mock_aws
def test_at_least_three_master_nodes(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain-3-nodes",
ClusterConfig={"DedicatedMasterCount": 3},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
opensearch_service_domains_at_least_three_master_nodes,
)
check = opensearch_service_domains_at_least_three_master_nodes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Opensearch domain test-domain-3-nodes has 3 master nodes."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)

View File

@@ -0,0 +1,203 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled:
@mock_aws
def test_no_domains(self):
client("opensearch", region_name=AWS_REGION_US_EAST_1)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
)
check = (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
)
result = check.execute()
assert len(result) == 0
@mock_aws
def test_instance_count_less_than_three_zoneawareness_enabled(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain",
ClusterConfig={"ZoneAwarenessEnabled": True, "InstanceCount": 2},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
)
check = (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Opensearch domain test-domain has zone awareness enabled, but only 2 data nodes."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)
@mock_aws
def test_instance_count_more_than_three_zoneawareness_not_enabled(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain",
ClusterConfig={"ZoneAwarenessEnabled": False, "InstanceCount": 3},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
)
check = (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Opensearch domain test-domain has 3 data nodes, but zone awareness is not enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)
@mock_aws
def test_instance_count_less_than_three_zoneawareness_not_enabled(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain",
ClusterConfig={"ZoneAwarenessEnabled": False, "InstanceCount": 2},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
)
check = (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Opensearch domain test-domain does not meet the requirements: it has less than 3 data nodes and zone awareness is not enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)
@mock_aws
def test_logging_instance_count_more_than_three_zoneawareness_enabled(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain",
ClusterConfig={"ZoneAwarenessEnabled": True, "InstanceCount": 3},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
)
check = (
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Opensearch domain test-domain has 3 data nodes and zone awareness enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)

View File

@@ -1,27 +1,32 @@
from re import search
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( from boto3 import client
OpenSearchDomain, from moto import mock_aws
PublishingLoggingOption,
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_audit_logging_enabled: class Test_opensearch_service_domains_audit_logging_enabled:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
opensearch_service_domains_audit_logging_enabled, opensearch_service_domains_audit_logging_enabled,
@@ -31,22 +36,29 @@ class Test_opensearch_service_domains_audit_logging_enabled:
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
@mock_aws
def test_no_logging_enabled(self): def test_no_logging_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-no-logging",
OpenSearchDomain( LogPublishingOptions={
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn "SEARCH_SLOW_LOGS": {
) "CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*"
},
},
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
opensearch_service_domains_audit_logging_enabled, opensearch_service_domains_audit_logging_enabled,
@@ -56,29 +68,40 @@ class Test_opensearch_service_domains_audit_logging_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search("AUDIT_LOGS disabled", result[0].status_extended) assert (
assert result[0].resource_id == domain_name result[0].status_extended
assert result[0].resource_arn == domain_arn == "Opensearch domain test-domain-no-logging AUDIT_LOGS disabled."
def test_logging_AUDIT_LOGS_enabled(self):
opensearch_client = mock.MagicMock
opensearch_client.opensearch_domains = []
opensearch_client.opensearch_domains.append(
OpenSearchDomain(
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
) )
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)
@mock_aws
def test_logging_AUDIT_LOGS_enabled(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain-logging",
LogPublishingOptions={
"AUDIT_LOGS": {
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:audit-logs:*",
"Enabled": True,
},
},
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
opensearch_client.opensearch_domains[0].logging.append( OpenSearchService,
PublishingLoggingOption(name="AUDIT_LOGS", enabled=True)
) )
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
opensearch_service_domains_audit_logging_enabled, opensearch_service_domains_audit_logging_enabled,
@@ -88,6 +111,12 @@ class Test_opensearch_service_domains_audit_logging_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search("AUDIT_LOGS enabled", result[0].status_extended) assert (
assert result[0].resource_id == domain_name result[0].status_extended
assert result[0].resource_arn == domain_arn == "Opensearch domain test-domain-logging AUDIT_LOGS enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)

View File

@@ -1,27 +1,32 @@
from re import search
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( from boto3 import client
OpenSearchDomain, from moto import mock_aws
PublishingLoggingOption,
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_cloudwatch_logging_enabled: class Test_opensearch_service_domains_cloudwatch_logging_enabled:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
opensearch_service_domains_cloudwatch_logging_enabled, opensearch_service_domains_cloudwatch_logging_enabled,
@@ -31,22 +36,30 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
@mock_aws
def test_no_logging_enabled(self): def test_no_logging_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-no-logging",
OpenSearchDomain( LogPublishingOptions={
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn "AUDIT_LOGS": {
) "CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
"Enabled": True,
},
},
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
opensearch_service_domains_cloudwatch_logging_enabled, opensearch_service_domains_cloudwatch_logging_enabled,
@@ -56,32 +69,40 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS disabled", result[0].status_extended
result[0].status_extended, == "Opensearch domain test-domain-no-logging SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS disabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
@mock_aws
def test_logging_SEARCH_SLOW_LOGS_enabled(self): def test_logging_SEARCH_SLOW_LOGS_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-no-index-logging",
OpenSearchDomain( LogPublishingOptions={
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn "SEARCH_SLOW_LOGS": {
) "CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
"Enabled": True,
},
},
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
opensearch_client.opensearch_domains[0].logging.append( OpenSearchService,
PublishingLoggingOption(name="SEARCH_SLOW_LOGS", enabled=True)
) )
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
opensearch_service_domains_cloudwatch_logging_enabled, opensearch_service_domains_cloudwatch_logging_enabled,
@@ -91,32 +112,40 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"SEARCH_SLOW_LOGS enabled but INDEX_SLOW_LOGS disabled", result[0].status_extended
result[0].status_extended, == "Opensearch domain test-domain-no-index-logging SEARCH_SLOW_LOGS enabled but INDEX_SLOW_LOGS disabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
@mock_aws
def test_logging_INDEX_SLOW_LOGS_enabled(self): def test_logging_INDEX_SLOW_LOGS_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-no-search-logging",
OpenSearchDomain( LogPublishingOptions={
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn "INDEX_SLOW_LOGS": {
) "CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
"Enabled": True,
},
},
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
opensearch_client.opensearch_domains[0].logging.append( OpenSearchService,
PublishingLoggingOption(name="INDEX_SLOW_LOGS", enabled=True)
) )
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
opensearch_service_domains_cloudwatch_logging_enabled, opensearch_service_domains_cloudwatch_logging_enabled,
@@ -126,34 +155,44 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"INDEX_SLOW_LOGS enabled but SEARCH_SLOW_LOGS disabled", result[0].status_extended
result[0].status_extended, == "Opensearch domain test-domain-no-search-logging INDEX_SLOW_LOGS enabled but SEARCH_SLOW_LOGS disabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
@mock_aws
def test_logging_INDEX_SLOW_LOGS_and_SEARCH_SLOW_LOGS_enabled(self): def test_logging_INDEX_SLOW_LOGS_and_SEARCH_SLOW_LOGS_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-logging",
OpenSearchDomain( LogPublishingOptions={
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn "SEARCH_SLOW_LOGS": {
) "CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
"Enabled": True,
},
"INDEX_SLOW_LOGS": {
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
"Enabled": True,
},
},
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
logging_options = [ OpenSearchService,
PublishingLoggingOption(name="INDEX_SLOW_LOGS", enabled=True), )
PublishingLoggingOption(name="SEARCH_SLOW_LOGS", enabled=True),
] mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
opensearch_client.opensearch_domains[0].logging.extend(logging_options)
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
opensearch_service_domains_cloudwatch_logging_enabled, opensearch_service_domains_cloudwatch_logging_enabled,
@@ -163,9 +202,12 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search( assert (
"SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS enabled", result[0].status_extended
result[0].status_extended, == "Opensearch domain test-domain-logging SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn

View File

@@ -1,26 +1,32 @@
from re import search
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( from boto3 import client
OpenSearchDomain, from moto import mock_aws
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_encryption_at_rest_enabled: class Test_opensearch_service_domains_encryption_at_rest_enabled:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
opensearch_service_domains_encryption_at_rest_enabled, opensearch_service_domains_encryption_at_rest_enabled,
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
@mock_aws
def test_no_encryption_at_rest_enabled(self): def test_no_encryption_at_rest_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-no-encryption",
OpenSearchDomain( EncryptionAtRestOptions={"Enabled": False},
name=domain_name,
region=AWS_REGION_EU_WEST_1,
arn=domain_arn,
encryption_at_rest=False,
)
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
opensearch_service_domains_encryption_at_rest_enabled, opensearch_service_domains_encryption_at_rest_enabled,
@@ -58,31 +64,35 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"does not have encryption at-rest enabled", result[0].status_extended result[0].status_extended
== "Opensearch domain test-domain-no-encryption does not have encryption at-rest enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
@mock_aws
def test_encryption_at_rest_enabled(self): def test_encryption_at_rest_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-encryption-at-rest",
OpenSearchDomain( EncryptionAtRestOptions={"Enabled": True},
name=domain_name,
region=AWS_REGION_EU_WEST_1,
arn=domain_arn,
encryption_at_rest=True,
)
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
opensearch_service_domains_encryption_at_rest_enabled, opensearch_service_domains_encryption_at_rest_enabled,
@@ -92,6 +102,12 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search("has encryption at-rest enabled", result[0].status_extended) assert (
assert result[0].resource_id == domain_name result[0].status_extended
assert result[0].resource_arn == domain_arn == "Opensearch domain test-domain-encryption-at-rest has encryption at-rest enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)

View File

@@ -1,26 +1,32 @@
from re import search
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( from boto3 import client
OpenSearchDomain, from moto import mock_aws
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_https_communications_enforced: class Test_opensearch_service_domains_https_communications_enforced:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
opensearch_service_domains_https_communications_enforced, opensearch_service_domains_https_communications_enforced,
@@ -30,25 +36,27 @@ class Test_opensearch_service_domains_https_communications_enforced:
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
@mock_aws
def test_no_https_enabled(self): def test_no_https_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-no-https",
OpenSearchDomain( DomainEndpointOptions={
name=domain_name, "EnforceHTTPS": False,
region=AWS_REGION_EU_WEST_1, },
arn=domain_arn,
enforce_https=False,
)
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
opensearch_service_domains_https_communications_enforced, opensearch_service_domains_https_communications_enforced,
@@ -58,31 +66,37 @@ class Test_opensearch_service_domains_https_communications_enforced:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"does not have enforce HTTPS enabled", result[0].status_extended result[0].status_extended
== "Opensearch domain test-domain-no-https does not have enforce HTTPS enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
@mock_aws
def test_https_enabled(self): def test_https_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-https",
OpenSearchDomain( DomainEndpointOptions={
name=domain_name, "EnforceHTTPS": True,
region=AWS_REGION_EU_WEST_1, },
arn=domain_arn,
enforce_https=True,
)
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
opensearch_service_domains_https_communications_enforced, opensearch_service_domains_https_communications_enforced,
@@ -92,6 +106,12 @@ class Test_opensearch_service_domains_https_communications_enforced:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search("has enforce HTTPS enabled", result[0].status_extended) assert (
assert result[0].resource_id == domain_name result[0].status_extended
assert result[0].resource_arn == domain_arn == "Opensearch domain test-domain-https has enforce HTTPS enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)

View File

@@ -1,26 +1,32 @@
from re import search
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( from boto3 import client
OpenSearchDomain, from moto import mock_aws
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_internal_user_database_enabled: class Test_opensearch_service_domains_internal_user_database_enabled:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
opensearch_service_domains_internal_user_database_enabled, opensearch_service_domains_internal_user_database_enabled,
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
@mock_aws
def test_internal_database_disabled(self): def test_internal_database_disabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain",
OpenSearchDomain( AdvancedSecurityOptions={"InternalUserDatabaseEnabled": False},
name=domain_name,
region=AWS_REGION_EU_WEST_1,
arn=domain_arn,
internal_user_database=False,
)
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
opensearch_service_domains_internal_user_database_enabled, opensearch_service_domains_internal_user_database_enabled,
@@ -58,32 +64,35 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search( assert (
"does not have internal user database enabled", result[0].status_extended
result[0].status_extended, == "Opensearch domain test-domain does not have internal user database enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
@mock_aws
def test_internal_database_enabled(self): def test_internal_database_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain",
OpenSearchDomain( AdvancedSecurityOptions={"InternalUserDatabaseEnabled": True},
name=domain_name,
region=AWS_REGION_EU_WEST_1,
arn=domain_arn,
internal_user_database=True,
)
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
opensearch_service_domains_internal_user_database_enabled, opensearch_service_domains_internal_user_database_enabled,
@@ -93,8 +102,12 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"has internal user database enabled", result[0].status_extended result[0].status_extended
== "Opensearch domain test-domain has internal user database enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn

View File

@@ -1,26 +1,32 @@
from re import search
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( from boto3 import client
OpenSearchDomain, from moto import mock_aws
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_node_to_node_encryption_enabled: class Test_opensearch_service_domains_node_to_node_encryption_enabled:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
opensearch_service_domains_node_to_node_encryption_enabled, opensearch_service_domains_node_to_node_encryption_enabled,
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
@mock_aws
def test_no_encryption_enabled(self): def test_no_encryption_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-no-encryption",
OpenSearchDomain( NodeToNodeEncryptionOptions={"Enabled": False},
name=domain_name,
region=AWS_REGION_EU_WEST_1,
arn=domain_arn,
node_to_node_encryption=False,
)
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
opensearch_service_domains_node_to_node_encryption_enabled, opensearch_service_domains_node_to_node_encryption_enabled,
@@ -58,32 +64,35 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search( assert (
"does not have node-to-node encryption enabled", result[0].status_extended
result[0].status_extended, == "Opensearch domain test-domain-no-encryption does not have node-to-node encryption enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
@mock_aws
def test_encryption_enabled(self): def test_encryption_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-encryption",
OpenSearchDomain( NodeToNodeEncryptionOptions={"Enabled": True},
name=domain_name,
region=AWS_REGION_EU_WEST_1,
arn=domain_arn,
node_to_node_encryption=True,
)
) )
opensearch_client.opensearch_domains[0].logging = [] from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
opensearch_service_domains_node_to_node_encryption_enabled, opensearch_service_domains_node_to_node_encryption_enabled,
@@ -93,8 +102,12 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert search( assert (
"has node-to-node encryption enabled", result[0].status_extended result[0].status_extended
== "Opensearch domain test-domain-encryption has node-to-node encryption enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn

View File

@@ -306,7 +306,7 @@ class Test_opensearch_service_domains_not_publicly_accessible:
def test_domain_inside_vpc(self): def test_domain_inside_vpc(self):
opensearch_client = mock.MagicMock opensearch_client = mock.MagicMock
opensearch_client.opensearch_domains = [] opensearch_client.opensearch_domains = {}
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2]) aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
@@ -324,13 +324,12 @@ class Test_opensearch_service_domains_not_publicly_accessible:
opensearch_service_domains_not_publicly_accessible, opensearch_service_domains_not_publicly_accessible,
) )
opensearch_client.opensearch_domains.append( domain_arn = f"arn:aws:es:{AWS_REGION_US_WEST_2}:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
OpenSearchDomain( opensearch_client.opensearch_domains[domain_arn] = OpenSearchDomain(
name=domain_name, name=domain_name,
region=AWS_REGION_US_WEST_2, region=AWS_REGION_US_WEST_2,
arn=f"arn:aws:es:{AWS_REGION_US_WEST_2}:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}", arn=domain_arn,
vpc_id="vpc-123456", vpc_id="vpc-123456",
)
) )
check = opensearch_service_domains_not_publicly_accessible() check = opensearch_service_domains_not_publicly_accessible()

View File

@@ -1,26 +1,76 @@
from re import search
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( import botocore
OpenSearchDomain, from boto3 import client
) from moto import mock_aws
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListDomainNames":
return {
"DomainNames": [
{
"DomainName": "test-domain-updates",
},
]
}
if operation_name == "DescribeDomain":
return {
"DomainStatus": {
"DomainName": "test-domain-updates",
"EngineVersion": "OpenSearch2.0",
"ServiceSoftwareOptions": {
"UpdateAvailable": True,
},
"ARN": f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/test-domain-updates",
"ClusterConfig": {
"InstanceCount": 1,
},
"AdvancedSecurityOptions": {
"InternalUserDatabaseEnabled": False,
},
"CognitoOptions": {
"Enabled": False,
},
"EncryptionAtRestOptions": {
"Enabled": False,
},
"NodeToNodeEncryptionOptions": {
"Enabled": False,
},
"DomainEndpointOptions": {
"EnforceHTTPS": False,
},
}
}
return make_api_call(self, operation_name, kwarg)
class Test_opensearch_service_domains_updated_to_the_latest_service_software_version: class Test_opensearch_service_domains_updated_to_the_latest_service_software_version:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
opensearch_service_domains_updated_to_the_latest_service_software_version, opensearch_service_domains_updated_to_the_latest_service_software_version,
@@ -32,61 +82,22 @@ class Test_opensearch_service_domains_updated_to_the_latest_service_software_ver
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
def test_internal_update_available(self): @mock_aws
opensearch_client = mock.MagicMock @mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
opensearch_client.opensearch_domains = [] def test_updates_available(self):
opensearch_client.opensearch_domains.append( client("opensearch", region_name=AWS_REGION_US_EAST_1)
OpenSearchDomain( from prowler.providers.aws.services.opensearch.opensearch_service import (
name=domain_name, OpenSearchService,
region=AWS_REGION_EU_WEST_1,
arn=domain_arn,
update_available=False,
)
) )
opensearch_client.opensearch_domains[0].logging = []
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
new=opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
opensearch_service_domains_updated_to_the_latest_service_software_version,
)
check = (
opensearch_service_domains_updated_to_the_latest_service_software_version()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"does not have internal updates available", result[0].status_extended
)
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
def test_internal_database_enabled(self):
opensearch_client = mock.MagicMock
opensearch_client.opensearch_domains = []
opensearch_client.opensearch_domains.append(
OpenSearchDomain(
name=domain_name,
region=AWS_REGION_EU_WEST_1,
arn=domain_arn,
update_available=True,
)
)
opensearch_client.opensearch_domains[0].logging = []
with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
new=opensearch_client,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
new=opensearch_client,
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
opensearch_service_domains_updated_to_the_latest_service_software_version, opensearch_service_domains_updated_to_the_latest_service_software_version,
@@ -98,6 +109,52 @@ class Test_opensearch_service_domains_updated_to_the_latest_service_software_ver
result = check.execute() result = check.execute()
assert len(result) == 1 assert len(result) == 1
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert search("has internal updates available", result[0].status_extended) assert (
assert result[0].resource_id == domain_name result[0].status_extended
assert result[0].resource_arn == domain_arn == "Opensearch domain test-domain-updates with version OpenSearch2.0 has internal updates available."
)
assert result[0].resource_id == "test-domain-updates"
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/test-domain-updates"
)
@mock_aws
def test_no_updates_availables(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain-no-updates",
SoftwareUpdateOptions={"AutoSoftwareUpdateEnabled": True},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
opensearch_service_domains_updated_to_the_latest_service_software_version,
)
check = (
opensearch_service_domains_updated_to_the_latest_service_software_version()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Opensearch domain test-domain-no-updates with version {domain["DomainStatus"]["EngineVersion"]} does not have internal updates available."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)

View File

@@ -1,25 +1,32 @@
from unittest import mock from unittest import mock
from prowler.providers.aws.services.opensearch.opensearch_service import ( from boto3 import client
OpenSearchDomain, from moto import mock_aws
)
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
domain_name = "test-domain" from tests.providers.aws.utils import (
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}" AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_use_cognito_authentication_for_kibana: class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
@mock_aws
def test_no_domains(self): def test_no_domains(self):
opensearch_client = mock.MagicMock client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = []
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
opensearch_service_domains_use_cognito_authentication_for_kibana, opensearch_service_domains_use_cognito_authentication_for_kibana,
@@ -29,25 +36,24 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
result = check.execute() result = check.execute()
assert len(result) == 0 assert len(result) == 0
@mock_aws
def test_neither_cognito_nor_saml_enabled(self): def test_neither_cognito_nor_saml_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-no-cognito-no-saml",
OpenSearchDomain( )
name=domain_name, from prowler.providers.aws.services.opensearch.opensearch_service import (
region=AWS_REGION_EU_WEST_1, OpenSearchService,
arn=domain_arn,
cognito_options=False,
saml_enabled=False,
)
) )
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
opensearch_service_domains_use_cognito_authentication_for_kibana, opensearch_service_domains_use_cognito_authentication_for_kibana,
@@ -59,31 +65,33 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
assert result[0].status == "FAIL" assert result[0].status == "FAIL"
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Opensearch domain {domain_name} has neither Amazon Cognito nor SAML authentication for Kibana enabled." == "Opensearch domain test-domain-no-cognito-no-saml has neither Amazon Cognito nor SAML authentication for Kibana enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
assert result[0].resource_tags == []
@mock_aws
def test_cognito_enabled(self): def test_cognito_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-cognito",
OpenSearchDomain( CognitoOptions={"Enabled": True},
name=domain_name, )
region=AWS_REGION_EU_WEST_1, from prowler.providers.aws.services.opensearch.opensearch_service import (
arn=domain_arn, OpenSearchService,
cognito_options=True,
)
) )
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
opensearch_service_domains_use_cognito_authentication_for_kibana, opensearch_service_domains_use_cognito_authentication_for_kibana,
@@ -95,32 +103,33 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Opensearch domain {domain_name} has either Amazon Cognito or SAML authentication for Kibana enabled." == "Opensearch domain test-domain-cognito has either Amazon Cognito or SAML authentication for Kibana enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
assert result[0].resource_tags == []
assert result[0].resource_arn == domain_arn
@mock_aws
def test_saml_enabled(self): def test_saml_enabled(self):
opensearch_client = mock.MagicMock opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
opensearch_client.opensearch_domains = [] domain = opensearch_client.create_domain(
opensearch_client.opensearch_domains.append( DomainName="test-domain-saml",
OpenSearchDomain( AdvancedSecurityOptions={"SAMLOptions": {"Enabled": True}},
name=domain_name, )
region=AWS_REGION_EU_WEST_1, from prowler.providers.aws.services.opensearch.opensearch_service import (
arn=domain_arn, OpenSearchService,
saml_enabled=True,
)
) )
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch( with mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService", "prowler.providers.common.provider.Provider.get_global_provider",
opensearch_client, return_value=mocked_aws_provider,
), mock.patch( ), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client", "prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
new=opensearch_client, new=OpenSearchService(mocked_aws_provider),
): ):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import ( from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
opensearch_service_domains_use_cognito_authentication_for_kibana, opensearch_service_domains_use_cognito_authentication_for_kibana,
@@ -132,10 +141,10 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
assert result[0].status == "PASS" assert result[0].status == "PASS"
assert ( assert (
result[0].status_extended result[0].status_extended
== f"Opensearch domain {domain_name} has either Amazon Cognito or SAML authentication for Kibana enabled." == "Opensearch domain test-domain-saml has either Amazon Cognito or SAML authentication for Kibana enabled."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
) )
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
assert result[0].resource_tags == []
assert result[0].resource_arn == domain_arn

View File

@@ -2,6 +2,8 @@ from json import dumps
from unittest.mock import patch from unittest.mock import patch
import botocore import botocore
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.opensearch.opensearch_service import ( from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService, OpenSearchService,
@@ -13,7 +15,7 @@ from tests.providers.aws.utils import (
) )
test_domain_name = "test" test_domain_name = "test"
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{test_domain_name}" domain_arn = f"arn:aws:es:eu-west-1:{AWS_ACCOUNT_NUMBER}:domain/{test_domain_name}"
policy_data = { policy_data = {
"Version": "2012-10-17", "Version": "2012-10-17",
@@ -56,35 +58,6 @@ def mock_make_api_call(self, operation_name, kwarg):
}, },
} }
} }
if operation_name == "DescribeDomain":
return {
"DomainStatus": {
"ARN": domain_arn,
"Endpoints": {
"vpc": "vpc-endpoint-h2dsd34efgyghrtguk5gt6j2foh4.us-east-1.es.amazonaws.com"
},
"EngineVersion": "opensearch-version1",
"VPCOptions": {
"VPCId": "test-vpc-id",
},
"CognitoOptions": {"Enabled": True},
"EncryptionAtRestOptions": {"Enabled": True},
"NodeToNodeEncryptionOptions": {"Enabled": True},
"AdvancedOptions": {"string": "string"},
"LogPublishingOptions": {
"string": {
"CloudWatchLogsLogGroupArn": "string",
"Enabled": True | False,
}
},
"ServiceSoftwareOptions": {"UpdateAvailable": True},
"DomainEndpointOptions": {"EnforceHTTPS": True},
"AdvancedSecurityOptions": {
"InternalUserDatabaseEnabled": True,
"SAMLOptions": {"Enabled": True},
},
}
}
if operation_name == "ListTags": if operation_name == "ListTags":
return { return {
"TagList": [ "TagList": [
@@ -132,45 +105,85 @@ class Test_OpenSearchService_Service:
aws_provider = set_mocked_aws_provider([]) aws_provider = set_mocked_aws_provider([])
opensearch = OpenSearchService(aws_provider) opensearch = OpenSearchService(aws_provider)
assert len(opensearch.opensearch_domains) == 1 assert len(opensearch.opensearch_domains) == 1
assert opensearch.opensearch_domains[0].name == test_domain_name assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1 assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
# Test OpenSearchService describ domain config # Test OpenSearchService describ domain config
def test_describe_domain_config(self): def test_describe_domain_config(self):
aws_provider = set_mocked_aws_provider([]) aws_provider = set_mocked_aws_provider([])
opensearch = OpenSearchService(aws_provider) opensearch = OpenSearchService(aws_provider)
assert len(opensearch.opensearch_domains) == 1 assert len(opensearch.opensearch_domains) == 1
assert opensearch.opensearch_domains[0].name == test_domain_name assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1 assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
assert opensearch.opensearch_domains[0].access_policy assert opensearch.opensearch_domains[domain_arn].access_policy
assert opensearch.opensearch_domains[0].logging[0].name == "SEARCH_SLOW_LOGS" assert (
assert opensearch.opensearch_domains[0].logging[0].enabled opensearch.opensearch_domains[domain_arn].logging[0].name
assert opensearch.opensearch_domains[0].logging[1].name == "INDEX_SLOW_LOGS" == "SEARCH_SLOW_LOGS"
assert opensearch.opensearch_domains[0].logging[1].enabled )
assert opensearch.opensearch_domains[0].logging[2].name == "AUDIT_LOGS" assert opensearch.opensearch_domains[domain_arn].logging[0].enabled
assert opensearch.opensearch_domains[0].logging[2].enabled assert (
opensearch.opensearch_domains[domain_arn].logging[1].name
== "INDEX_SLOW_LOGS"
)
assert opensearch.opensearch_domains[domain_arn].logging[1].enabled
assert opensearch.opensearch_domains[domain_arn].logging[2].name == "AUDIT_LOGS"
assert opensearch.opensearch_domains[domain_arn].logging[2].enabled
# Test OpenSearchService describ domain # Test OpenSearchService describ domain
@mock_aws
def test_describe_domain(self): def test_describe_domain(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_EU_WEST_1)
domain = opensearch_client.create_domain(
DomainName=test_domain_name,
EncryptionAtRestOptions={"Enabled": True},
ClusterConfig={
"InstanceCount": 1,
"ZoneAwarenessEnabled": True,
"DedicatedMasterCount": 1,
},
NodeToNodeEncryptionOptions={"Enabled": True},
AdvancedSecurityOptions={
"Enabled": True,
"InternalUserDatabaseEnabled": True,
"SAMLOptions": {"Enabled": True},
},
CognitoOptions={"Enabled": True},
VPCOptions={
"SubnetIds": ["test-subnet-id"],
"SecurityGroupIds": ["test-security-group-id"],
},
DomainEndpointOptions={"EnforceHTTPS": True},
AccessPolicies=policy_json,
EngineVersion="opensearch-version1",
TagList=[
{"Key": "test", "Value": "test"},
],
)
aws_provider = set_mocked_aws_provider([]) aws_provider = set_mocked_aws_provider([])
opensearch = OpenSearchService(aws_provider) opensearch = OpenSearchService(aws_provider)
domain_arn = domain["DomainStatus"]["ARN"]
assert len(opensearch.opensearch_domains) == 1 assert len(opensearch.opensearch_domains) == 1
assert opensearch.opensearch_domains[0].name == test_domain_name assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1 assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
assert opensearch.opensearch_domains[0].arn == domain_arn assert opensearch.opensearch_domains[domain_arn].arn == domain_arn
assert opensearch.opensearch_domains[0].access_policy assert opensearch.opensearch_domains[domain_arn].access_policy
assert opensearch.opensearch_domains[0].vpc_endpoints == [ assert opensearch.opensearch_domains[domain_arn].vpc_endpoints == [
"vpc-endpoint-h2dsd34efgyghrtguk5gt6j2foh4.us-east-1.es.amazonaws.com" "test.eu-west-1.es.amazonaws.com"
] ]
assert opensearch.opensearch_domains[0].vpc_id == "test-vpc-id" assert not opensearch.opensearch_domains[domain_arn].vpc_id
assert opensearch.opensearch_domains[0].cognito_options assert opensearch.opensearch_domains[domain_arn].cognito_options
assert opensearch.opensearch_domains[0].encryption_at_rest assert opensearch.opensearch_domains[domain_arn].encryption_at_rest
assert opensearch.opensearch_domains[0].node_to_node_encryption assert opensearch.opensearch_domains[domain_arn].node_to_node_encryption
assert opensearch.opensearch_domains[0].enforce_https assert opensearch.opensearch_domains[domain_arn].enforce_https
assert opensearch.opensearch_domains[0].internal_user_database assert opensearch.opensearch_domains[domain_arn].internal_user_database
assert opensearch.opensearch_domains[0].saml_enabled assert opensearch.opensearch_domains[domain_arn].saml_enabled
assert opensearch.opensearch_domains[0].update_available assert not opensearch.opensearch_domains[domain_arn].update_available
assert opensearch.opensearch_domains[0].version == "opensearch-version1" assert (
assert opensearch.opensearch_domains[0].tags == [ opensearch.opensearch_domains[domain_arn].version == "opensearch-version1"
)
assert opensearch.opensearch_domains[domain_arn].instance_count == 1
assert opensearch.opensearch_domains[domain_arn].zone_awareness_enabled
assert opensearch.opensearch_domains[domain_arn].dedicated_master_count == 1
assert opensearch.opensearch_domains[domain_arn].tags == [
{"Key": "test", "Value": "test"}, {"Key": "test", "Value": "test"},
] ]