mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-31 21:27:28 +00:00
Compare commits
5 Commits
fix/update
...
PRWLR-4969
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
185d6c482d | ||
|
|
04993f4e76 | ||
|
|
2bd9f73c1b | ||
|
|
0d743fa179 | ||
|
|
eca94e2e3b |
@@ -13,142 +13,145 @@ class OpenSearchService(AWSService):
|
|||||||
def __init__(self, provider):
|
def __init__(self, provider):
|
||||||
# Call AWSService's __init__
|
# Call AWSService's __init__
|
||||||
super().__init__("opensearch", provider)
|
super().__init__("opensearch", provider)
|
||||||
self.opensearch_domains = []
|
self.opensearch_domains = {}
|
||||||
self.__threading_call__(self._list_domain_names)
|
self.__threading_call__(self._list_domain_names)
|
||||||
self._describe_domain_config(self.regional_clients)
|
self.__threading_call__(
|
||||||
self._describe_domain(self.regional_clients)
|
self._describe_domain_config, self.opensearch_domains.values()
|
||||||
self._list_tags()
|
)
|
||||||
|
self.__threading_call__(self._describe_domain, self.opensearch_domains.values())
|
||||||
|
self.__threading_call__(self._list_tags, self.opensearch_domains.values())
|
||||||
|
|
||||||
def _list_domain_names(self, regional_client):
|
def _list_domain_names(self, regional_client):
|
||||||
logger.info("OpenSearch - listing domain names...")
|
logger.info("OpenSearch - listing domain names...")
|
||||||
try:
|
try:
|
||||||
domains = regional_client.list_domain_names()
|
domains = regional_client.list_domain_names()
|
||||||
for domain in domains["DomainNames"]:
|
for domain in domains["DomainNames"]:
|
||||||
arn = f"arn:{self.audited_partition}:opensearch:{regional_client.region}:{self.audited_account}:domain/{domain['DomainName']}"
|
arn = f"arn:{self.audited_partition}:es:{regional_client.region}:{self.audited_account}:domain/{domain['DomainName']}"
|
||||||
if not self.audit_resources or (
|
if not self.audit_resources or (
|
||||||
is_resource_filtered(arn, self.audit_resources)
|
is_resource_filtered(arn, self.audit_resources)
|
||||||
):
|
):
|
||||||
self.opensearch_domains.append(
|
self.opensearch_domains[arn] = OpenSearchDomain(
|
||||||
OpenSearchDomain(
|
arn=arn,
|
||||||
arn=arn,
|
name=domain["DomainName"],
|
||||||
name=domain["DomainName"],
|
region=regional_client.region,
|
||||||
region=regional_client.region,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||||
)
|
)
|
||||||
|
|
||||||
def _describe_domain_config(self, regional_clients):
|
def _describe_domain_config(self, domain):
|
||||||
logger.info("OpenSearch - describing domain configurations...")
|
logger.info("OpenSearch - describing domain configurations...")
|
||||||
try:
|
try:
|
||||||
for domain in self.opensearch_domains:
|
regional_client = self.regional_clients[domain.region]
|
||||||
regional_client = regional_clients[domain.region]
|
describe_domain = regional_client.describe_domain_config(
|
||||||
describe_domain = regional_client.describe_domain_config(
|
DomainName=domain.name
|
||||||
DomainName=domain.name
|
)
|
||||||
)
|
for logging_key in [
|
||||||
for logging_key in [
|
"SEARCH_SLOW_LOGS",
|
||||||
"SEARCH_SLOW_LOGS",
|
"INDEX_SLOW_LOGS",
|
||||||
"INDEX_SLOW_LOGS",
|
"AUDIT_LOGS",
|
||||||
"AUDIT_LOGS",
|
]:
|
||||||
]:
|
if logging_key in describe_domain["DomainConfig"].get(
|
||||||
if logging_key in describe_domain["DomainConfig"].get(
|
"LogPublishingOptions", {}
|
||||||
"LogPublishingOptions", {}
|
).get("Options", {}):
|
||||||
).get("Options", {}):
|
domain.logging.append(
|
||||||
domain.logging.append(
|
PublishingLoggingOption(
|
||||||
PublishingLoggingOption(
|
name=logging_key,
|
||||||
name=logging_key,
|
enabled=describe_domain["DomainConfig"][
|
||||||
enabled=describe_domain["DomainConfig"][
|
"LogPublishingOptions"
|
||||||
"LogPublishingOptions"
|
]["Options"][logging_key]["Enabled"],
|
||||||
]["Options"][logging_key]["Enabled"],
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
try:
|
|
||||||
domain.access_policy = loads(
|
|
||||||
describe_domain["DomainConfig"]["AccessPolicies"]["Options"]
|
|
||||||
)
|
)
|
||||||
except JSONDecodeError as error:
|
|
||||||
logger.warning(
|
|
||||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
except Exception as error:
|
|
||||||
logger.error(
|
|
||||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _describe_domain(self, regional_clients):
|
|
||||||
logger.info("OpenSearch - describing domain configurations...")
|
|
||||||
try:
|
|
||||||
for domain in self.opensearch_domains:
|
|
||||||
regional_client = regional_clients[domain.region]
|
|
||||||
describe_domain = regional_client.describe_domain(
|
|
||||||
DomainName=domain.name
|
|
||||||
)
|
|
||||||
domain.arn = describe_domain["DomainStatus"]["ARN"]
|
|
||||||
domain.vpc_endpoints = None
|
|
||||||
if "Endpoints" in describe_domain["DomainStatus"]:
|
|
||||||
if "vpc" in describe_domain["DomainStatus"]["Endpoints"]:
|
|
||||||
domain.vpc_endpoints = [
|
|
||||||
vpc
|
|
||||||
for vpc in describe_domain["DomainStatus"][
|
|
||||||
"Endpoints"
|
|
||||||
].values()
|
|
||||||
]
|
|
||||||
|
|
||||||
domain.vpc_id = None
|
|
||||||
if "VPCOptions" in describe_domain["DomainStatus"]:
|
|
||||||
domain.vpc_id = describe_domain["DomainStatus"]["VPCOptions"][
|
|
||||||
"VPCId"
|
|
||||||
]
|
|
||||||
domain.cognito_options = describe_domain["DomainStatus"][
|
|
||||||
"CognitoOptions"
|
|
||||||
]["Enabled"]
|
|
||||||
domain.encryption_at_rest = describe_domain["DomainStatus"][
|
|
||||||
"EncryptionAtRestOptions"
|
|
||||||
]["Enabled"]
|
|
||||||
domain.node_to_node_encryption = describe_domain["DomainStatus"][
|
|
||||||
"NodeToNodeEncryptionOptions"
|
|
||||||
]["Enabled"]
|
|
||||||
domain.enforce_https = describe_domain["DomainStatus"][
|
|
||||||
"DomainEndpointOptions"
|
|
||||||
]["EnforceHTTPS"]
|
|
||||||
domain.internal_user_database = describe_domain["DomainStatus"][
|
|
||||||
"AdvancedSecurityOptions"
|
|
||||||
]["InternalUserDatabaseEnabled"]
|
|
||||||
domain.saml_enabled = (
|
|
||||||
describe_domain["DomainStatus"]["AdvancedSecurityOptions"]
|
|
||||||
.get("SAMLOptions", {})
|
|
||||||
.get("Enabled", False)
|
|
||||||
)
|
|
||||||
domain.update_available = describe_domain["DomainStatus"][
|
|
||||||
"ServiceSoftwareOptions"
|
|
||||||
]["UpdateAvailable"]
|
|
||||||
domain.version = describe_domain["DomainStatus"]["EngineVersion"]
|
|
||||||
domain.advanced_settings_enabled = describe_domain["DomainStatus"][
|
|
||||||
"AdvancedSecurityOptions"
|
|
||||||
]["Enabled"]
|
|
||||||
except Exception as error:
|
|
||||||
logger.error(
|
|
||||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _list_tags(self):
|
|
||||||
logger.info("OpenSearch - List Tags...")
|
|
||||||
for domain in self.opensearch_domains:
|
|
||||||
try:
|
try:
|
||||||
regional_client = self.regional_clients[domain.region]
|
domain.access_policy = loads(
|
||||||
response = regional_client.list_tags(
|
describe_domain["DomainConfig"]["AccessPolicies"]["Options"]
|
||||||
ARN=domain.arn,
|
)
|
||||||
)["TagList"]
|
except JSONDecodeError as error:
|
||||||
domain.tags = response
|
logger.warning(
|
||||||
except Exception as error:
|
|
||||||
logger.error(
|
|
||||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
except Exception as error:
|
||||||
|
logger.error(
|
||||||
|
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _describe_domain(self, domain):
|
||||||
|
logger.info("OpenSearch - describing domain configurations...")
|
||||||
|
try:
|
||||||
|
regional_client = self.regional_clients[domain.region]
|
||||||
|
describe_domain = regional_client.describe_domain(DomainName=domain.name)
|
||||||
|
domain.arn = describe_domain["DomainStatus"]["ARN"]
|
||||||
|
domain.vpc_endpoints = None
|
||||||
|
if "Endpoints" in describe_domain["DomainStatus"]:
|
||||||
|
if "vpc" in describe_domain["DomainStatus"]["Endpoints"]:
|
||||||
|
domain.vpc_endpoints = [
|
||||||
|
vpc
|
||||||
|
for vpc in describe_domain["DomainStatus"]["Endpoints"].values()
|
||||||
|
]
|
||||||
|
|
||||||
|
domain.vpc_id = None
|
||||||
|
if "VPCOptions" in describe_domain["DomainStatus"]:
|
||||||
|
domain.vpc_id = describe_domain["DomainStatus"]["VPCOptions"].get(
|
||||||
|
"VPCId", None
|
||||||
|
)
|
||||||
|
domain.cognito_options = describe_domain["DomainStatus"][
|
||||||
|
"CognitoOptions"
|
||||||
|
].get("Enabled", False)
|
||||||
|
domain.encryption_at_rest = describe_domain["DomainStatus"][
|
||||||
|
"EncryptionAtRestOptions"
|
||||||
|
].get("Enabled", False)
|
||||||
|
domain.node_to_node_encryption = describe_domain["DomainStatus"][
|
||||||
|
"NodeToNodeEncryptionOptions"
|
||||||
|
].get("Enabled", False)
|
||||||
|
domain.enforce_https = describe_domain["DomainStatus"][
|
||||||
|
"DomainEndpointOptions"
|
||||||
|
].get("EnforceHTTPS", False)
|
||||||
|
domain.internal_user_database = describe_domain["DomainStatus"][
|
||||||
|
"AdvancedSecurityOptions"
|
||||||
|
].get("InternalUserDatabaseEnabled", False)
|
||||||
|
domain.saml_enabled = (
|
||||||
|
describe_domain["DomainStatus"]["AdvancedSecurityOptions"]
|
||||||
|
.get("SAMLOptions", {})
|
||||||
|
.get("Enabled", False)
|
||||||
|
)
|
||||||
|
domain.update_available = (
|
||||||
|
describe_domain["DomainStatus"]
|
||||||
|
.get("ServiceSoftwareOptions", {"UpdateAvailable": False})
|
||||||
|
.get("UpdateAvailable", False)
|
||||||
|
)
|
||||||
|
domain.version = describe_domain["DomainStatus"].get("EngineVersion", None)
|
||||||
|
domain.advanced_settings_enabled = describe_domain["DomainStatus"][
|
||||||
|
"AdvancedSecurityOptions"
|
||||||
|
].get("Enabled", False)
|
||||||
|
domain.instance_count = describe_domain["DomainStatus"][
|
||||||
|
"ClusterConfig"
|
||||||
|
].get("InstanceCount", None)
|
||||||
|
domain.zone_awareness_enabled = describe_domain["DomainStatus"][
|
||||||
|
"ClusterConfig"
|
||||||
|
].get("ZoneAwarenessEnabled", False)
|
||||||
|
domain.dedicated_master_count = describe_domain["DomainStatus"][
|
||||||
|
"ClusterConfig"
|
||||||
|
].get("DedicatedMasterCount", None)
|
||||||
|
except Exception as error:
|
||||||
|
logger.error(
|
||||||
|
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _list_tags(self, domain):
|
||||||
|
logger.info("OpenSearch - List Tags...")
|
||||||
|
try:
|
||||||
|
regional_client = self.regional_clients[domain.region]
|
||||||
|
response = regional_client.list_tags(
|
||||||
|
ARN=domain.arn,
|
||||||
|
)["TagList"]
|
||||||
|
domain.tags = response
|
||||||
|
except Exception as error:
|
||||||
|
logger.error(
|
||||||
|
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class PublishingLoggingOption(BaseModel):
|
class PublishingLoggingOption(BaseModel):
|
||||||
name: str
|
name: str
|
||||||
@@ -171,5 +174,8 @@ class OpenSearchDomain(BaseModel):
|
|||||||
saml_enabled: bool = None
|
saml_enabled: bool = None
|
||||||
update_available: bool = None
|
update_available: bool = None
|
||||||
version: str = None
|
version: str = None
|
||||||
|
instance_count: Optional[int]
|
||||||
|
zone_awareness_enabled: Optional[bool]
|
||||||
|
dedicated_master_count: Optional[int]
|
||||||
tags: Optional[list] = []
|
tags: Optional[list] = []
|
||||||
advanced_settings_enabled: bool = None
|
advanced_settings_enabled: bool = None
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_access_control_enabled(Check):
|
class opensearch_service_domains_access_control_enabled(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -0,0 +1,34 @@
|
|||||||
|
{
|
||||||
|
"Provider": "aws",
|
||||||
|
"CheckID": "opensearch_service_domains_at_least_three_master_nodes",
|
||||||
|
"CheckTitle": "Elasticsearch domains should be configured with at least three dedicated master nodes",
|
||||||
|
"CheckType": [
|
||||||
|
"Software and Configuration Checks/AWS Security Best Practices"
|
||||||
|
],
|
||||||
|
"ServiceName": "elasticsearch",
|
||||||
|
"SubServiceName": "domain",
|
||||||
|
"ResourceIdTemplate": "arn:aws:es:{region}:{account-id}:domain/{domain-name}",
|
||||||
|
"Severity": "medium",
|
||||||
|
"ResourceType": "AwsElasticsearchDomain",
|
||||||
|
"Description": "This control checks whether Elasticsearch domains are configured with at least three dedicated primary nodes. This control fails if the domain does not use dedicated primary nodes. Using more than three primary nodes may not provide significant additional availability benefits, while incurring extra costs.",
|
||||||
|
"Risk": "Without at least three dedicated master nodes, the Elasticsearch domain's fault-tolerance and ability to handle cluster management operations during node failures may be compromised, leading to potential data unavailability.",
|
||||||
|
"RelatedUrl": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/what-is.html",
|
||||||
|
"Remediation": {
|
||||||
|
"Code": {
|
||||||
|
"CLI": "aws opensearch update-domain-config --domain-name <domain-name> --cluster-config DedicatedMasterEnabled=true,MasterInstanceCount=3",
|
||||||
|
"NativeIaC": "",
|
||||||
|
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/es-controls.html#es-7",
|
||||||
|
"Terraform": ""
|
||||||
|
},
|
||||||
|
"Recommendation": {
|
||||||
|
"Text": "Configure Elasticsearch domains with at least three dedicated master nodes for high availability and cluster fault tolerance.",
|
||||||
|
"Url": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-configuration-changes.html"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"Categories": [
|
||||||
|
"redundancy"
|
||||||
|
],
|
||||||
|
"DependsOn": [],
|
||||||
|
"RelatedTo": [],
|
||||||
|
"Notes": ""
|
||||||
|
}
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||||
|
opensearch_client,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class opensearch_service_domains_at_least_three_master_nodes(Check):
|
||||||
|
def execute(self):
|
||||||
|
findings = []
|
||||||
|
|
||||||
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
|
report = Check_Report_AWS(self.metadata())
|
||||||
|
report.region = domain.region
|
||||||
|
report.resource_id = domain.name
|
||||||
|
report.resource_arn = domain.arn
|
||||||
|
report.resource_tags = domain.tags
|
||||||
|
|
||||||
|
report.status = "FAIL"
|
||||||
|
report.status_extended = f"Opensearch domain {domain.name} has only {domain.dedicated_master_count} master nodes."
|
||||||
|
|
||||||
|
if domain.dedicated_master_count >= 3:
|
||||||
|
report.status = "PASS"
|
||||||
|
report.status_extended = f"Opensearch domain {domain.name} has {domain.dedicated_master_count} master nodes."
|
||||||
|
|
||||||
|
findings.append(report)
|
||||||
|
|
||||||
|
return findings
|
||||||
@@ -0,0 +1,34 @@
|
|||||||
|
{
|
||||||
|
"Provider": "aws",
|
||||||
|
"CheckID": "opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled",
|
||||||
|
"CheckTitle": "Elasticsearch/Opensearch domains should have at least three data nodes",
|
||||||
|
"CheckType": [
|
||||||
|
"Software and Configuration Checks/AWS Security Best Practices"
|
||||||
|
],
|
||||||
|
"ServiceName": "opensearch",
|
||||||
|
"SubServiceName": "",
|
||||||
|
"ResourceIdTemplate": "arn:partition:service:{region}:{account-id}:domain/{domain-name}",
|
||||||
|
"Severity": "medium",
|
||||||
|
"ResourceType": "AwsElasticsearchDomain",
|
||||||
|
"Description": "This control checks whether Elasticsearch/Opensearch domains are configured with at least three data nodes and zoneAwarenessEnabled is true.",
|
||||||
|
"Risk": "Without at least three data nodes, the Elasticsearch/Opensearch domain may not be fault-tolerant, leading to a higher risk of data loss or unavailability in case of node failure.",
|
||||||
|
"RelatedUrl": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/what-is.html",
|
||||||
|
"Remediation": {
|
||||||
|
"Code": {
|
||||||
|
"CLI": "aws opensearch update-domain-config --domain-name <domain-name> --cluster-config InstanceCount=3,ZoneAwarenessEnabled=true",
|
||||||
|
"NativeIaC": "",
|
||||||
|
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/es-controls.html#es-6",
|
||||||
|
"Terraform": ""
|
||||||
|
},
|
||||||
|
"Recommendation": {
|
||||||
|
"Text": "Modify the Elasticsearch/Opensearch domain to ensure at least three data nodes and enable zone awareness for high availability.",
|
||||||
|
"Url": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-configuration-changes.html"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"Categories": [
|
||||||
|
"redundancy"
|
||||||
|
],
|
||||||
|
"DependsOn": [],
|
||||||
|
"RelatedTo": [],
|
||||||
|
"Notes": ""
|
||||||
|
}
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||||
|
opensearch_client,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled(Check):
|
||||||
|
def execute(self):
|
||||||
|
findings = []
|
||||||
|
|
||||||
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
|
report = Check_Report_AWS(self.metadata())
|
||||||
|
report.region = domain.region
|
||||||
|
report.resource_id = domain.name
|
||||||
|
report.resource_arn = domain.arn
|
||||||
|
report.resource_tags = domain.tags
|
||||||
|
|
||||||
|
report.status = "FAIL"
|
||||||
|
report.status_extended = f"Opensearch domain {domain.name} does not meet the requirements: it has less than 3 data nodes and zone awareness is not enabled."
|
||||||
|
|
||||||
|
if domain.instance_count >= 3 and domain.zone_awareness_enabled:
|
||||||
|
report.status = "PASS"
|
||||||
|
report.status_extended = f"Opensearch domain {domain.name} has {domain.instance_count} data nodes and zone awareness enabled."
|
||||||
|
elif domain.instance_count >= 3 and not domain.zone_awareness_enabled:
|
||||||
|
report.status = "FAIL"
|
||||||
|
report.status_extended = f"Opensearch domain {domain.name} has {domain.instance_count} data nodes, but zone awareness is not enabled."
|
||||||
|
elif domain.instance_count < 3 and domain.zone_awareness_enabled:
|
||||||
|
report.status = "FAIL"
|
||||||
|
report.status_extended = f"Opensearch domain {domain.name} has zone awareness enabled, but only {domain.instance_count} data nodes."
|
||||||
|
|
||||||
|
findings.append(report)
|
||||||
|
|
||||||
|
return findings
|
||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_audit_logging_enabled(Check):
|
class opensearch_service_domains_audit_logging_enabled(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_cloudwatch_logging_enabled(Check):
|
class opensearch_service_domains_cloudwatch_logging_enabled(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_encryption_at_rest_enabled(Check):
|
class opensearch_service_domains_encryption_at_rest_enabled(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_https_communications_enforced(Check):
|
class opensearch_service_domains_https_communications_enforced(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_internal_user_database_enabled(Check):
|
class opensearch_service_domains_internal_user_database_enabled(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_node_to_node_encryption_enabled(Check):
|
class opensearch_service_domains_node_to_node_encryption_enabled(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_not_publicly_accessible(Check):
|
class opensearch_service_domains_not_publicly_accessible(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -7,14 +7,14 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_updated_to_the_latest_service_software_version(Check):
|
class opensearch_service_domains_updated_to_the_latest_service_software_version(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
report.resource_arn = domain.arn
|
report.resource_arn = domain.arn
|
||||||
report.resource_tags = domain.tags
|
report.resource_tags = domain.tags
|
||||||
report.status = "PASS"
|
report.status = "PASS"
|
||||||
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} does not have internal updates available."
|
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} does not have internal updates available."
|
||||||
if domain.update_available:
|
if domain.update_available:
|
||||||
report.status = "FAIL"
|
report.status = "FAIL"
|
||||||
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} has internal updates available."
|
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} has internal updates available."
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
|||||||
class opensearch_service_domains_use_cognito_authentication_for_kibana(Check):
|
class opensearch_service_domains_use_cognito_authentication_for_kibana(Check):
|
||||||
def execute(self):
|
def execute(self):
|
||||||
findings = []
|
findings = []
|
||||||
for domain in opensearch_client.opensearch_domains:
|
for domain in opensearch_client.opensearch_domains.values():
|
||||||
report = Check_Report_AWS(self.metadata())
|
report = Check_Report_AWS(self.metadata())
|
||||||
report.region = domain.region
|
report.region = domain.region
|
||||||
report.resource_id = domain.name
|
report.resource_id = domain.name
|
||||||
|
|||||||
@@ -1,26 +1,32 @@
|
|||||||
from re import search
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from boto3 import client
|
||||||
OpenSearchDomain,
|
from moto import mock_aws
|
||||||
)
|
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_access_control_enabled:
|
class Test_opensearch_service_domains_access_control_enabled:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
||||||
opensearch_service_domains_access_control_enabled,
|
opensearch_service_domains_access_control_enabled,
|
||||||
@@ -30,22 +36,26 @@ class Test_opensearch_service_domains_access_control_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_no_fine_grained_access_enabled(self):
|
def test_no_fine_grained_access_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain",
|
||||||
OpenSearchDomain(
|
AdvancedSecurityOptions={"Enabled": False},
|
||||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].advanced_settings_enabled = False
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
||||||
opensearch_service_domains_access_control_enabled,
|
opensearch_service_domains_access_control_enabled,
|
||||||
@@ -55,29 +65,36 @@ class Test_opensearch_service_domains_access_control_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search(
|
assert (
|
||||||
"does not have fine grained access control enabled.",
|
f"Opensearch domain {domain["DomainStatus"]["DomainName"]} does not have fine grained access control enabled."
|
||||||
result[0].status_extended,
|
== result[0].status_extended
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
def test_logging_AUDIT_LOGS_enabled(self):
|
@mock_aws
|
||||||
opensearch_client = mock.MagicMock
|
def test_fine_grained_access_enabled(self):
|
||||||
opensearch_client.opensearch_domains = []
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains.append(
|
domain = opensearch_client.create_domain(
|
||||||
OpenSearchDomain(
|
DomainName="test-domain",
|
||||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
AdvancedSecurityOptions={"Enabled": True},
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].advanced_settings_enabled = True
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
||||||
opensearch_service_domains_access_control_enabled,
|
opensearch_service_domains_access_control_enabled,
|
||||||
@@ -87,8 +104,12 @@ class Test_opensearch_service_domains_access_control_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert search(
|
assert (
|
||||||
"has fine grained access control enabled.", result[0].status_extended
|
f"Opensearch domain {domain["DomainStatus"]["DomainName"]} has fine grained access control enabled."
|
||||||
|
== result[0].status_extended
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|||||||
@@ -0,0 +1,115 @@
|
|||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from boto3 import client
|
||||||
|
from moto import mock_aws
|
||||||
|
|
||||||
|
from tests.providers.aws.utils import (
|
||||||
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Test_opensearch_service_domains_at_least_three_master_nodes:
|
||||||
|
@mock_aws
|
||||||
|
def test_no_domains(self):
|
||||||
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
|
||||||
|
opensearch_service_domains_at_least_three_master_nodes,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = opensearch_service_domains_at_least_three_master_nodes()
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
|
def test_less_than_three_master_nodes(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName="test-domain-2-nodes",
|
||||||
|
ClusterConfig={"DedicatedMasterCount": 2},
|
||||||
|
)
|
||||||
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
|
||||||
|
opensearch_service_domains_at_least_three_master_nodes,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = opensearch_service_domains_at_least_three_master_nodes()
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "FAIL"
|
||||||
|
assert (
|
||||||
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain-2-nodes has only 2 master nodes."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
|
def test_at_least_three_master_nodes(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName="test-domain-3-nodes",
|
||||||
|
ClusterConfig={"DedicatedMasterCount": 3},
|
||||||
|
)
|
||||||
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
|
||||||
|
opensearch_service_domains_at_least_three_master_nodes,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = opensearch_service_domains_at_least_three_master_nodes()
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "PASS"
|
||||||
|
assert (
|
||||||
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain-3-nodes has 3 master nodes."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
@@ -0,0 +1,203 @@
|
|||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from boto3 import client
|
||||||
|
from moto import mock_aws
|
||||||
|
|
||||||
|
from tests.providers.aws.utils import (
|
||||||
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Test_opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled:
|
||||||
|
@mock_aws
|
||||||
|
def test_no_domains(self):
|
||||||
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||||
|
)
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
|
def test_instance_count_less_than_three_zoneawareness_enabled(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName="test-domain",
|
||||||
|
ClusterConfig={"ZoneAwarenessEnabled": True, "InstanceCount": 2},
|
||||||
|
)
|
||||||
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||||
|
)
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "FAIL"
|
||||||
|
assert (
|
||||||
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain has zone awareness enabled, but only 2 data nodes."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
|
def test_instance_count_more_than_three_zoneawareness_not_enabled(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName="test-domain",
|
||||||
|
ClusterConfig={"ZoneAwarenessEnabled": False, "InstanceCount": 3},
|
||||||
|
)
|
||||||
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||||
|
)
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "FAIL"
|
||||||
|
assert (
|
||||||
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain has 3 data nodes, but zone awareness is not enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
|
def test_instance_count_less_than_three_zoneawareness_not_enabled(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName="test-domain",
|
||||||
|
ClusterConfig={"ZoneAwarenessEnabled": False, "InstanceCount": 2},
|
||||||
|
)
|
||||||
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||||
|
)
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "FAIL"
|
||||||
|
assert (
|
||||||
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain does not meet the requirements: it has less than 3 data nodes and zone awareness is not enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
|
def test_logging_instance_count_more_than_three_zoneawareness_enabled(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName="test-domain",
|
||||||
|
ClusterConfig={"ZoneAwarenessEnabled": True, "InstanceCount": 3},
|
||||||
|
)
|
||||||
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = (
|
||||||
|
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||||
|
)
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "PASS"
|
||||||
|
assert (
|
||||||
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain has 3 data nodes and zone awareness enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
@@ -1,27 +1,32 @@
|
|||||||
from re import search
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from boto3 import client
|
||||||
OpenSearchDomain,
|
from moto import mock_aws
|
||||||
PublishingLoggingOption,
|
|
||||||
)
|
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_audit_logging_enabled:
|
class Test_opensearch_service_domains_audit_logging_enabled:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
||||||
opensearch_service_domains_audit_logging_enabled,
|
opensearch_service_domains_audit_logging_enabled,
|
||||||
@@ -31,22 +36,29 @@ class Test_opensearch_service_domains_audit_logging_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_no_logging_enabled(self):
|
def test_no_logging_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-no-logging",
|
||||||
OpenSearchDomain(
|
LogPublishingOptions={
|
||||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
"SEARCH_SLOW_LOGS": {
|
||||||
)
|
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*"
|
||||||
|
},
|
||||||
|
},
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
||||||
opensearch_service_domains_audit_logging_enabled,
|
opensearch_service_domains_audit_logging_enabled,
|
||||||
@@ -56,29 +68,40 @@ class Test_opensearch_service_domains_audit_logging_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search("AUDIT_LOGS disabled", result[0].status_extended)
|
assert (
|
||||||
assert result[0].resource_id == domain_name
|
result[0].status_extended
|
||||||
assert result[0].resource_arn == domain_arn
|
== "Opensearch domain test-domain-no-logging AUDIT_LOGS disabled."
|
||||||
|
|
||||||
def test_logging_AUDIT_LOGS_enabled(self):
|
|
||||||
opensearch_client = mock.MagicMock
|
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
opensearch_client.opensearch_domains.append(
|
|
||||||
OpenSearchDomain(
|
|
||||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
|
||||||
)
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
|
def test_logging_AUDIT_LOGS_enabled(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName="test-domain-logging",
|
||||||
|
LogPublishingOptions={
|
||||||
|
"AUDIT_LOGS": {
|
||||||
|
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:audit-logs:*",
|
||||||
|
"Enabled": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
opensearch_client.opensearch_domains[0].logging.append(
|
OpenSearchService,
|
||||||
PublishingLoggingOption(name="AUDIT_LOGS", enabled=True)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
||||||
opensearch_service_domains_audit_logging_enabled,
|
opensearch_service_domains_audit_logging_enabled,
|
||||||
@@ -88,6 +111,12 @@ class Test_opensearch_service_domains_audit_logging_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert search("AUDIT_LOGS enabled", result[0].status_extended)
|
assert (
|
||||||
assert result[0].resource_id == domain_name
|
result[0].status_extended
|
||||||
assert result[0].resource_arn == domain_arn
|
== "Opensearch domain test-domain-logging AUDIT_LOGS enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,27 +1,32 @@
|
|||||||
from re import search
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from boto3 import client
|
||||||
OpenSearchDomain,
|
from moto import mock_aws
|
||||||
PublishingLoggingOption,
|
|
||||||
)
|
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||||
@@ -31,22 +36,30 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_no_logging_enabled(self):
|
def test_no_logging_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-no-logging",
|
||||||
OpenSearchDomain(
|
LogPublishingOptions={
|
||||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
"AUDIT_LOGS": {
|
||||||
)
|
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||||
|
"Enabled": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||||
@@ -56,32 +69,40 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search(
|
assert (
|
||||||
"SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS disabled",
|
result[0].status_extended
|
||||||
result[0].status_extended,
|
== "Opensearch domain test-domain-no-logging SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS disabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_logging_SEARCH_SLOW_LOGS_enabled(self):
|
def test_logging_SEARCH_SLOW_LOGS_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-no-index-logging",
|
||||||
OpenSearchDomain(
|
LogPublishingOptions={
|
||||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
"SEARCH_SLOW_LOGS": {
|
||||||
)
|
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||||
|
"Enabled": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
opensearch_client.opensearch_domains[0].logging.append(
|
OpenSearchService,
|
||||||
PublishingLoggingOption(name="SEARCH_SLOW_LOGS", enabled=True)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||||
@@ -91,32 +112,40 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search(
|
assert (
|
||||||
"SEARCH_SLOW_LOGS enabled but INDEX_SLOW_LOGS disabled",
|
result[0].status_extended
|
||||||
result[0].status_extended,
|
== "Opensearch domain test-domain-no-index-logging SEARCH_SLOW_LOGS enabled but INDEX_SLOW_LOGS disabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_logging_INDEX_SLOW_LOGS_enabled(self):
|
def test_logging_INDEX_SLOW_LOGS_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-no-search-logging",
|
||||||
OpenSearchDomain(
|
LogPublishingOptions={
|
||||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
"INDEX_SLOW_LOGS": {
|
||||||
)
|
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||||
|
"Enabled": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
opensearch_client.opensearch_domains[0].logging.append(
|
OpenSearchService,
|
||||||
PublishingLoggingOption(name="INDEX_SLOW_LOGS", enabled=True)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||||
@@ -126,34 +155,44 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search(
|
assert (
|
||||||
"INDEX_SLOW_LOGS enabled but SEARCH_SLOW_LOGS disabled",
|
result[0].status_extended
|
||||||
result[0].status_extended,
|
== "Opensearch domain test-domain-no-search-logging INDEX_SLOW_LOGS enabled but SEARCH_SLOW_LOGS disabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_logging_INDEX_SLOW_LOGS_and_SEARCH_SLOW_LOGS_enabled(self):
|
def test_logging_INDEX_SLOW_LOGS_and_SEARCH_SLOW_LOGS_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-logging",
|
||||||
OpenSearchDomain(
|
LogPublishingOptions={
|
||||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
"SEARCH_SLOW_LOGS": {
|
||||||
)
|
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||||
|
"Enabled": True,
|
||||||
|
},
|
||||||
|
"INDEX_SLOW_LOGS": {
|
||||||
|
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||||
|
"Enabled": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
logging_options = [
|
OpenSearchService,
|
||||||
PublishingLoggingOption(name="INDEX_SLOW_LOGS", enabled=True),
|
)
|
||||||
PublishingLoggingOption(name="SEARCH_SLOW_LOGS", enabled=True),
|
|
||||||
]
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
opensearch_client.opensearch_domains[0].logging.extend(logging_options)
|
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||||
@@ -163,9 +202,12 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert search(
|
assert (
|
||||||
"SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS enabled",
|
result[0].status_extended
|
||||||
result[0].status_extended,
|
== "Opensearch domain test-domain-logging SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|||||||
@@ -1,26 +1,32 @@
|
|||||||
from re import search
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from boto3 import client
|
||||||
OpenSearchDomain,
|
from moto import mock_aws
|
||||||
)
|
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
||||||
opensearch_service_domains_encryption_at_rest_enabled,
|
opensearch_service_domains_encryption_at_rest_enabled,
|
||||||
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_no_encryption_at_rest_enabled(self):
|
def test_no_encryption_at_rest_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-no-encryption",
|
||||||
OpenSearchDomain(
|
EncryptionAtRestOptions={"Enabled": False},
|
||||||
name=domain_name,
|
|
||||||
region=AWS_REGION_EU_WEST_1,
|
|
||||||
arn=domain_arn,
|
|
||||||
encryption_at_rest=False,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
||||||
opensearch_service_domains_encryption_at_rest_enabled,
|
opensearch_service_domains_encryption_at_rest_enabled,
|
||||||
@@ -58,31 +64,35 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search(
|
assert (
|
||||||
"does not have encryption at-rest enabled", result[0].status_extended
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain-no-encryption does not have encryption at-rest enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_encryption_at_rest_enabled(self):
|
def test_encryption_at_rest_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-encryption-at-rest",
|
||||||
OpenSearchDomain(
|
EncryptionAtRestOptions={"Enabled": True},
|
||||||
name=domain_name,
|
|
||||||
region=AWS_REGION_EU_WEST_1,
|
|
||||||
arn=domain_arn,
|
|
||||||
encryption_at_rest=True,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
||||||
opensearch_service_domains_encryption_at_rest_enabled,
|
opensearch_service_domains_encryption_at_rest_enabled,
|
||||||
@@ -92,6 +102,12 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert search("has encryption at-rest enabled", result[0].status_extended)
|
assert (
|
||||||
assert result[0].resource_id == domain_name
|
result[0].status_extended
|
||||||
assert result[0].resource_arn == domain_arn
|
== "Opensearch domain test-domain-encryption-at-rest has encryption at-rest enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,26 +1,32 @@
|
|||||||
from re import search
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from boto3 import client
|
||||||
OpenSearchDomain,
|
from moto import mock_aws
|
||||||
)
|
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_https_communications_enforced:
|
class Test_opensearch_service_domains_https_communications_enforced:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
||||||
opensearch_service_domains_https_communications_enforced,
|
opensearch_service_domains_https_communications_enforced,
|
||||||
@@ -30,25 +36,27 @@ class Test_opensearch_service_domains_https_communications_enforced:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_no_https_enabled(self):
|
def test_no_https_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-no-https",
|
||||||
OpenSearchDomain(
|
DomainEndpointOptions={
|
||||||
name=domain_name,
|
"EnforceHTTPS": False,
|
||||||
region=AWS_REGION_EU_WEST_1,
|
},
|
||||||
arn=domain_arn,
|
|
||||||
enforce_https=False,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
||||||
opensearch_service_domains_https_communications_enforced,
|
opensearch_service_domains_https_communications_enforced,
|
||||||
@@ -58,31 +66,37 @@ class Test_opensearch_service_domains_https_communications_enforced:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search(
|
assert (
|
||||||
"does not have enforce HTTPS enabled", result[0].status_extended
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain-no-https does not have enforce HTTPS enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_https_enabled(self):
|
def test_https_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-https",
|
||||||
OpenSearchDomain(
|
DomainEndpointOptions={
|
||||||
name=domain_name,
|
"EnforceHTTPS": True,
|
||||||
region=AWS_REGION_EU_WEST_1,
|
},
|
||||||
arn=domain_arn,
|
|
||||||
enforce_https=True,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
||||||
opensearch_service_domains_https_communications_enforced,
|
opensearch_service_domains_https_communications_enforced,
|
||||||
@@ -92,6 +106,12 @@ class Test_opensearch_service_domains_https_communications_enforced:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert search("has enforce HTTPS enabled", result[0].status_extended)
|
assert (
|
||||||
assert result[0].resource_id == domain_name
|
result[0].status_extended
|
||||||
assert result[0].resource_arn == domain_arn
|
== "Opensearch domain test-domain-https has enforce HTTPS enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,26 +1,32 @@
|
|||||||
from re import search
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from boto3 import client
|
||||||
OpenSearchDomain,
|
from moto import mock_aws
|
||||||
)
|
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_internal_user_database_enabled:
|
class Test_opensearch_service_domains_internal_user_database_enabled:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
||||||
opensearch_service_domains_internal_user_database_enabled,
|
opensearch_service_domains_internal_user_database_enabled,
|
||||||
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_internal_database_disabled(self):
|
def test_internal_database_disabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain",
|
||||||
OpenSearchDomain(
|
AdvancedSecurityOptions={"InternalUserDatabaseEnabled": False},
|
||||||
name=domain_name,
|
|
||||||
region=AWS_REGION_EU_WEST_1,
|
|
||||||
arn=domain_arn,
|
|
||||||
internal_user_database=False,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
||||||
opensearch_service_domains_internal_user_database_enabled,
|
opensearch_service_domains_internal_user_database_enabled,
|
||||||
@@ -58,32 +64,35 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert search(
|
assert (
|
||||||
"does not have internal user database enabled",
|
result[0].status_extended
|
||||||
result[0].status_extended,
|
== "Opensearch domain test-domain does not have internal user database enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_internal_database_enabled(self):
|
def test_internal_database_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain",
|
||||||
OpenSearchDomain(
|
AdvancedSecurityOptions={"InternalUserDatabaseEnabled": True},
|
||||||
name=domain_name,
|
|
||||||
region=AWS_REGION_EU_WEST_1,
|
|
||||||
arn=domain_arn,
|
|
||||||
internal_user_database=True,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
||||||
opensearch_service_domains_internal_user_database_enabled,
|
opensearch_service_domains_internal_user_database_enabled,
|
||||||
@@ -93,8 +102,12 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search(
|
assert (
|
||||||
"has internal user database enabled", result[0].status_extended
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain has internal user database enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|||||||
@@ -1,26 +1,32 @@
|
|||||||
from re import search
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from boto3 import client
|
||||||
OpenSearchDomain,
|
from moto import mock_aws
|
||||||
)
|
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
||||||
opensearch_service_domains_node_to_node_encryption_enabled,
|
opensearch_service_domains_node_to_node_encryption_enabled,
|
||||||
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_no_encryption_enabled(self):
|
def test_no_encryption_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-no-encryption",
|
||||||
OpenSearchDomain(
|
NodeToNodeEncryptionOptions={"Enabled": False},
|
||||||
name=domain_name,
|
|
||||||
region=AWS_REGION_EU_WEST_1,
|
|
||||||
arn=domain_arn,
|
|
||||||
node_to_node_encryption=False,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
||||||
opensearch_service_domains_node_to_node_encryption_enabled,
|
opensearch_service_domains_node_to_node_encryption_enabled,
|
||||||
@@ -58,32 +64,35 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search(
|
assert (
|
||||||
"does not have node-to-node encryption enabled",
|
result[0].status_extended
|
||||||
result[0].status_extended,
|
== "Opensearch domain test-domain-no-encryption does not have node-to-node encryption enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_encryption_enabled(self):
|
def test_encryption_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-encryption",
|
||||||
OpenSearchDomain(
|
NodeToNodeEncryptionOptions={"Enabled": True},
|
||||||
name=domain_name,
|
|
||||||
region=AWS_REGION_EU_WEST_1,
|
|
||||||
arn=domain_arn,
|
|
||||||
node_to_node_encryption=True,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
||||||
opensearch_service_domains_node_to_node_encryption_enabled,
|
opensearch_service_domains_node_to_node_encryption_enabled,
|
||||||
@@ -93,8 +102,12 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert search(
|
assert (
|
||||||
"has node-to-node encryption enabled", result[0].status_extended
|
result[0].status_extended
|
||||||
|
== "Opensearch domain test-domain-encryption has node-to-node encryption enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|||||||
@@ -306,7 +306,7 @@ class Test_opensearch_service_domains_not_publicly_accessible:
|
|||||||
|
|
||||||
def test_domain_inside_vpc(self):
|
def test_domain_inside_vpc(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = mock.MagicMock
|
||||||
opensearch_client.opensearch_domains = []
|
opensearch_client.opensearch_domains = {}
|
||||||
|
|
||||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
|
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
|
||||||
|
|
||||||
@@ -324,13 +324,12 @@ class Test_opensearch_service_domains_not_publicly_accessible:
|
|||||||
opensearch_service_domains_not_publicly_accessible,
|
opensearch_service_domains_not_publicly_accessible,
|
||||||
)
|
)
|
||||||
|
|
||||||
opensearch_client.opensearch_domains.append(
|
domain_arn = f"arn:aws:es:{AWS_REGION_US_WEST_2}:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||||
OpenSearchDomain(
|
opensearch_client.opensearch_domains[domain_arn] = OpenSearchDomain(
|
||||||
name=domain_name,
|
name=domain_name,
|
||||||
region=AWS_REGION_US_WEST_2,
|
region=AWS_REGION_US_WEST_2,
|
||||||
arn=f"arn:aws:es:{AWS_REGION_US_WEST_2}:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}",
|
arn=domain_arn,
|
||||||
vpc_id="vpc-123456",
|
vpc_id="vpc-123456",
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
check = opensearch_service_domains_not_publicly_accessible()
|
check = opensearch_service_domains_not_publicly_accessible()
|
||||||
|
|||||||
@@ -1,26 +1,76 @@
|
|||||||
from re import search
|
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
import botocore
|
||||||
OpenSearchDomain,
|
from boto3 import client
|
||||||
)
|
from moto import mock_aws
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
make_api_call = botocore.client.BaseClient._make_api_call
|
||||||
|
|
||||||
|
|
||||||
|
def mock_make_api_call(self, operation_name, kwarg):
|
||||||
|
if operation_name == "ListDomainNames":
|
||||||
|
return {
|
||||||
|
"DomainNames": [
|
||||||
|
{
|
||||||
|
"DomainName": "test-domain-updates",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
if operation_name == "DescribeDomain":
|
||||||
|
return {
|
||||||
|
"DomainStatus": {
|
||||||
|
"DomainName": "test-domain-updates",
|
||||||
|
"EngineVersion": "OpenSearch2.0",
|
||||||
|
"ServiceSoftwareOptions": {
|
||||||
|
"UpdateAvailable": True,
|
||||||
|
},
|
||||||
|
"ARN": f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/test-domain-updates",
|
||||||
|
"ClusterConfig": {
|
||||||
|
"InstanceCount": 1,
|
||||||
|
},
|
||||||
|
"AdvancedSecurityOptions": {
|
||||||
|
"InternalUserDatabaseEnabled": False,
|
||||||
|
},
|
||||||
|
"CognitoOptions": {
|
||||||
|
"Enabled": False,
|
||||||
|
},
|
||||||
|
"EncryptionAtRestOptions": {
|
||||||
|
"Enabled": False,
|
||||||
|
},
|
||||||
|
"NodeToNodeEncryptionOptions": {
|
||||||
|
"Enabled": False,
|
||||||
|
},
|
||||||
|
"DomainEndpointOptions": {
|
||||||
|
"EnforceHTTPS": False,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return make_api_call(self, operation_name, kwarg)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_updated_to_the_latest_service_software_version:
|
class Test_opensearch_service_domains_updated_to_the_latest_service_software_version:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
||||||
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
||||||
@@ -32,61 +82,22 @@ class Test_opensearch_service_domains_updated_to_the_latest_service_software_ver
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
def test_internal_update_available(self):
|
@mock_aws
|
||||||
opensearch_client = mock.MagicMock
|
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||||
opensearch_client.opensearch_domains = []
|
def test_updates_available(self):
|
||||||
opensearch_client.opensearch_domains.append(
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
OpenSearchDomain(
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
name=domain_name,
|
OpenSearchService,
|
||||||
region=AWS_REGION_EU_WEST_1,
|
|
||||||
arn=domain_arn,
|
|
||||||
update_available=False,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
new=opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
|
||||||
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
|
||||||
)
|
|
||||||
|
|
||||||
check = (
|
|
||||||
opensearch_service_domains_updated_to_the_latest_service_software_version()
|
|
||||||
)
|
|
||||||
result = check.execute()
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0].status == "PASS"
|
|
||||||
assert search(
|
|
||||||
"does not have internal updates available", result[0].status_extended
|
|
||||||
)
|
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
def test_internal_database_enabled(self):
|
|
||||||
opensearch_client = mock.MagicMock
|
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
opensearch_client.opensearch_domains.append(
|
|
||||||
OpenSearchDomain(
|
|
||||||
name=domain_name,
|
|
||||||
region=AWS_REGION_EU_WEST_1,
|
|
||||||
arn=domain_arn,
|
|
||||||
update_available=True,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
opensearch_client.opensearch_domains[0].logging = []
|
|
||||||
|
|
||||||
with mock.patch(
|
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
|
||||||
new=opensearch_client,
|
|
||||||
), mock.patch(
|
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
|
||||||
new=opensearch_client,
|
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
||||||
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
||||||
@@ -98,6 +109,52 @@ class Test_opensearch_service_domains_updated_to_the_latest_service_software_ver
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert search("has internal updates available", result[0].status_extended)
|
assert (
|
||||||
assert result[0].resource_id == domain_name
|
result[0].status_extended
|
||||||
assert result[0].resource_arn == domain_arn
|
== "Opensearch domain test-domain-updates with version OpenSearch2.0 has internal updates available."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == "test-domain-updates"
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/test-domain-updates"
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
|
def test_no_updates_availables(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName="test-domain-no-updates",
|
||||||
|
SoftwareUpdateOptions={"AutoSoftwareUpdateEnabled": True},
|
||||||
|
)
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
|
return_value=mocked_aws_provider,
|
||||||
|
), mock.patch(
|
||||||
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
||||||
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
|
):
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
||||||
|
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
||||||
|
)
|
||||||
|
|
||||||
|
check = (
|
||||||
|
opensearch_service_domains_updated_to_the_latest_service_software_version()
|
||||||
|
)
|
||||||
|
result = check.execute()
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0].status == "PASS"
|
||||||
|
assert (
|
||||||
|
result[0].status_extended
|
||||||
|
== f"Opensearch domain test-domain-no-updates with version {domain["DomainStatus"]["EngineVersion"]} does not have internal updates available."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,25 +1,32 @@
|
|||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from boto3 import client
|
||||||
OpenSearchDomain,
|
from moto import mock_aws
|
||||||
)
|
|
||||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
|
||||||
|
|
||||||
domain_name = "test-domain"
|
from tests.providers.aws.utils import (
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
AWS_ACCOUNT_NUMBER,
|
||||||
|
AWS_REGION_US_EAST_1,
|
||||||
|
set_mocked_aws_provider,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
||||||
|
@mock_aws
|
||||||
def test_no_domains(self):
|
def test_no_domains(self):
|
||||||
opensearch_client = mock.MagicMock
|
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
|
||||||
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
|
OpenSearchService,
|
||||||
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
||||||
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
||||||
@@ -29,25 +36,24 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
|||||||
result = check.execute()
|
result = check.execute()
|
||||||
assert len(result) == 0
|
assert len(result) == 0
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_neither_cognito_nor_saml_enabled(self):
|
def test_neither_cognito_nor_saml_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-no-cognito-no-saml",
|
||||||
OpenSearchDomain(
|
)
|
||||||
name=domain_name,
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
region=AWS_REGION_EU_WEST_1,
|
OpenSearchService,
|
||||||
arn=domain_arn,
|
|
||||||
cognito_options=False,
|
|
||||||
saml_enabled=False,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
||||||
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
||||||
@@ -59,31 +65,33 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
|||||||
assert result[0].status == "FAIL"
|
assert result[0].status == "FAIL"
|
||||||
assert (
|
assert (
|
||||||
result[0].status_extended
|
result[0].status_extended
|
||||||
== f"Opensearch domain {domain_name} has neither Amazon Cognito nor SAML authentication for Kibana enabled."
|
== "Opensearch domain test-domain-no-cognito-no-saml has neither Amazon Cognito nor SAML authentication for Kibana enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].region == AWS_REGION_EU_WEST_1
|
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
assert result[0].resource_tags == []
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_cognito_enabled(self):
|
def test_cognito_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-cognito",
|
||||||
OpenSearchDomain(
|
CognitoOptions={"Enabled": True},
|
||||||
name=domain_name,
|
)
|
||||||
region=AWS_REGION_EU_WEST_1,
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
arn=domain_arn,
|
OpenSearchService,
|
||||||
cognito_options=True,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
||||||
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
||||||
@@ -95,32 +103,33 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
|||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert (
|
assert (
|
||||||
result[0].status_extended
|
result[0].status_extended
|
||||||
== f"Opensearch domain {domain_name} has either Amazon Cognito or SAML authentication for Kibana enabled."
|
== "Opensearch domain test-domain-cognito has either Amazon Cognito or SAML authentication for Kibana enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].region == AWS_REGION_EU_WEST_1
|
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
assert result[0].resource_tags == []
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|
||||||
|
@mock_aws
|
||||||
def test_saml_enabled(self):
|
def test_saml_enabled(self):
|
||||||
opensearch_client = mock.MagicMock
|
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||||
opensearch_client.opensearch_domains = []
|
domain = opensearch_client.create_domain(
|
||||||
opensearch_client.opensearch_domains.append(
|
DomainName="test-domain-saml",
|
||||||
OpenSearchDomain(
|
AdvancedSecurityOptions={"SAMLOptions": {"Enabled": True}},
|
||||||
name=domain_name,
|
)
|
||||||
region=AWS_REGION_EU_WEST_1,
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
arn=domain_arn,
|
OpenSearchService,
|
||||||
saml_enabled=True,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||||
|
|
||||||
with mock.patch(
|
with mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||||
opensearch_client,
|
return_value=mocked_aws_provider,
|
||||||
), mock.patch(
|
), mock.patch(
|
||||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
||||||
new=opensearch_client,
|
new=OpenSearchService(mocked_aws_provider),
|
||||||
):
|
):
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
||||||
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
||||||
@@ -132,10 +141,10 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
|||||||
assert result[0].status == "PASS"
|
assert result[0].status == "PASS"
|
||||||
assert (
|
assert (
|
||||||
result[0].status_extended
|
result[0].status_extended
|
||||||
== f"Opensearch domain {domain_name} has either Amazon Cognito or SAML authentication for Kibana enabled."
|
== "Opensearch domain test-domain-saml has either Amazon Cognito or SAML authentication for Kibana enabled."
|
||||||
|
)
|
||||||
|
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||||
|
assert (
|
||||||
|
result[0].resource_arn
|
||||||
|
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||||
)
|
)
|
||||||
assert result[0].region == AWS_REGION_EU_WEST_1
|
|
||||||
assert result[0].resource_id == domain_name
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
assert result[0].resource_tags == []
|
|
||||||
assert result[0].resource_arn == domain_arn
|
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ from json import dumps
|
|||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import botocore
|
import botocore
|
||||||
|
from boto3 import client
|
||||||
|
from moto import mock_aws
|
||||||
|
|
||||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||||
OpenSearchService,
|
OpenSearchService,
|
||||||
@@ -13,7 +15,7 @@ from tests.providers.aws.utils import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
test_domain_name = "test"
|
test_domain_name = "test"
|
||||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{test_domain_name}"
|
domain_arn = f"arn:aws:es:eu-west-1:{AWS_ACCOUNT_NUMBER}:domain/{test_domain_name}"
|
||||||
|
|
||||||
policy_data = {
|
policy_data = {
|
||||||
"Version": "2012-10-17",
|
"Version": "2012-10-17",
|
||||||
@@ -56,35 +58,6 @@ def mock_make_api_call(self, operation_name, kwarg):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if operation_name == "DescribeDomain":
|
|
||||||
return {
|
|
||||||
"DomainStatus": {
|
|
||||||
"ARN": domain_arn,
|
|
||||||
"Endpoints": {
|
|
||||||
"vpc": "vpc-endpoint-h2dsd34efgyghrtguk5gt6j2foh4.us-east-1.es.amazonaws.com"
|
|
||||||
},
|
|
||||||
"EngineVersion": "opensearch-version1",
|
|
||||||
"VPCOptions": {
|
|
||||||
"VPCId": "test-vpc-id",
|
|
||||||
},
|
|
||||||
"CognitoOptions": {"Enabled": True},
|
|
||||||
"EncryptionAtRestOptions": {"Enabled": True},
|
|
||||||
"NodeToNodeEncryptionOptions": {"Enabled": True},
|
|
||||||
"AdvancedOptions": {"string": "string"},
|
|
||||||
"LogPublishingOptions": {
|
|
||||||
"string": {
|
|
||||||
"CloudWatchLogsLogGroupArn": "string",
|
|
||||||
"Enabled": True | False,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"ServiceSoftwareOptions": {"UpdateAvailable": True},
|
|
||||||
"DomainEndpointOptions": {"EnforceHTTPS": True},
|
|
||||||
"AdvancedSecurityOptions": {
|
|
||||||
"InternalUserDatabaseEnabled": True,
|
|
||||||
"SAMLOptions": {"Enabled": True},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if operation_name == "ListTags":
|
if operation_name == "ListTags":
|
||||||
return {
|
return {
|
||||||
"TagList": [
|
"TagList": [
|
||||||
@@ -132,45 +105,85 @@ class Test_OpenSearchService_Service:
|
|||||||
aws_provider = set_mocked_aws_provider([])
|
aws_provider = set_mocked_aws_provider([])
|
||||||
opensearch = OpenSearchService(aws_provider)
|
opensearch = OpenSearchService(aws_provider)
|
||||||
assert len(opensearch.opensearch_domains) == 1
|
assert len(opensearch.opensearch_domains) == 1
|
||||||
assert opensearch.opensearch_domains[0].name == test_domain_name
|
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
|
||||||
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1
|
assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
|
||||||
|
|
||||||
# Test OpenSearchService describ domain config
|
# Test OpenSearchService describ domain config
|
||||||
def test_describe_domain_config(self):
|
def test_describe_domain_config(self):
|
||||||
aws_provider = set_mocked_aws_provider([])
|
aws_provider = set_mocked_aws_provider([])
|
||||||
opensearch = OpenSearchService(aws_provider)
|
opensearch = OpenSearchService(aws_provider)
|
||||||
assert len(opensearch.opensearch_domains) == 1
|
assert len(opensearch.opensearch_domains) == 1
|
||||||
assert opensearch.opensearch_domains[0].name == test_domain_name
|
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
|
||||||
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1
|
assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
|
||||||
assert opensearch.opensearch_domains[0].access_policy
|
assert opensearch.opensearch_domains[domain_arn].access_policy
|
||||||
assert opensearch.opensearch_domains[0].logging[0].name == "SEARCH_SLOW_LOGS"
|
assert (
|
||||||
assert opensearch.opensearch_domains[0].logging[0].enabled
|
opensearch.opensearch_domains[domain_arn].logging[0].name
|
||||||
assert opensearch.opensearch_domains[0].logging[1].name == "INDEX_SLOW_LOGS"
|
== "SEARCH_SLOW_LOGS"
|
||||||
assert opensearch.opensearch_domains[0].logging[1].enabled
|
)
|
||||||
assert opensearch.opensearch_domains[0].logging[2].name == "AUDIT_LOGS"
|
assert opensearch.opensearch_domains[domain_arn].logging[0].enabled
|
||||||
assert opensearch.opensearch_domains[0].logging[2].enabled
|
assert (
|
||||||
|
opensearch.opensearch_domains[domain_arn].logging[1].name
|
||||||
|
== "INDEX_SLOW_LOGS"
|
||||||
|
)
|
||||||
|
assert opensearch.opensearch_domains[domain_arn].logging[1].enabled
|
||||||
|
assert opensearch.opensearch_domains[domain_arn].logging[2].name == "AUDIT_LOGS"
|
||||||
|
assert opensearch.opensearch_domains[domain_arn].logging[2].enabled
|
||||||
|
|
||||||
# Test OpenSearchService describ domain
|
# Test OpenSearchService describ domain
|
||||||
|
@mock_aws
|
||||||
def test_describe_domain(self):
|
def test_describe_domain(self):
|
||||||
|
opensearch_client = client("opensearch", region_name=AWS_REGION_EU_WEST_1)
|
||||||
|
domain = opensearch_client.create_domain(
|
||||||
|
DomainName=test_domain_name,
|
||||||
|
EncryptionAtRestOptions={"Enabled": True},
|
||||||
|
ClusterConfig={
|
||||||
|
"InstanceCount": 1,
|
||||||
|
"ZoneAwarenessEnabled": True,
|
||||||
|
"DedicatedMasterCount": 1,
|
||||||
|
},
|
||||||
|
NodeToNodeEncryptionOptions={"Enabled": True},
|
||||||
|
AdvancedSecurityOptions={
|
||||||
|
"Enabled": True,
|
||||||
|
"InternalUserDatabaseEnabled": True,
|
||||||
|
"SAMLOptions": {"Enabled": True},
|
||||||
|
},
|
||||||
|
CognitoOptions={"Enabled": True},
|
||||||
|
VPCOptions={
|
||||||
|
"SubnetIds": ["test-subnet-id"],
|
||||||
|
"SecurityGroupIds": ["test-security-group-id"],
|
||||||
|
},
|
||||||
|
DomainEndpointOptions={"EnforceHTTPS": True},
|
||||||
|
AccessPolicies=policy_json,
|
||||||
|
EngineVersion="opensearch-version1",
|
||||||
|
TagList=[
|
||||||
|
{"Key": "test", "Value": "test"},
|
||||||
|
],
|
||||||
|
)
|
||||||
aws_provider = set_mocked_aws_provider([])
|
aws_provider = set_mocked_aws_provider([])
|
||||||
opensearch = OpenSearchService(aws_provider)
|
opensearch = OpenSearchService(aws_provider)
|
||||||
|
domain_arn = domain["DomainStatus"]["ARN"]
|
||||||
assert len(opensearch.opensearch_domains) == 1
|
assert len(opensearch.opensearch_domains) == 1
|
||||||
assert opensearch.opensearch_domains[0].name == test_domain_name
|
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
|
||||||
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1
|
assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
|
||||||
assert opensearch.opensearch_domains[0].arn == domain_arn
|
assert opensearch.opensearch_domains[domain_arn].arn == domain_arn
|
||||||
assert opensearch.opensearch_domains[0].access_policy
|
assert opensearch.opensearch_domains[domain_arn].access_policy
|
||||||
assert opensearch.opensearch_domains[0].vpc_endpoints == [
|
assert opensearch.opensearch_domains[domain_arn].vpc_endpoints == [
|
||||||
"vpc-endpoint-h2dsd34efgyghrtguk5gt6j2foh4.us-east-1.es.amazonaws.com"
|
"test.eu-west-1.es.amazonaws.com"
|
||||||
]
|
]
|
||||||
assert opensearch.opensearch_domains[0].vpc_id == "test-vpc-id"
|
assert not opensearch.opensearch_domains[domain_arn].vpc_id
|
||||||
assert opensearch.opensearch_domains[0].cognito_options
|
assert opensearch.opensearch_domains[domain_arn].cognito_options
|
||||||
assert opensearch.opensearch_domains[0].encryption_at_rest
|
assert opensearch.opensearch_domains[domain_arn].encryption_at_rest
|
||||||
assert opensearch.opensearch_domains[0].node_to_node_encryption
|
assert opensearch.opensearch_domains[domain_arn].node_to_node_encryption
|
||||||
assert opensearch.opensearch_domains[0].enforce_https
|
assert opensearch.opensearch_domains[domain_arn].enforce_https
|
||||||
assert opensearch.opensearch_domains[0].internal_user_database
|
assert opensearch.opensearch_domains[domain_arn].internal_user_database
|
||||||
assert opensearch.opensearch_domains[0].saml_enabled
|
assert opensearch.opensearch_domains[domain_arn].saml_enabled
|
||||||
assert opensearch.opensearch_domains[0].update_available
|
assert not opensearch.opensearch_domains[domain_arn].update_available
|
||||||
assert opensearch.opensearch_domains[0].version == "opensearch-version1"
|
assert (
|
||||||
assert opensearch.opensearch_domains[0].tags == [
|
opensearch.opensearch_domains[domain_arn].version == "opensearch-version1"
|
||||||
|
)
|
||||||
|
assert opensearch.opensearch_domains[domain_arn].instance_count == 1
|
||||||
|
assert opensearch.opensearch_domains[domain_arn].zone_awareness_enabled
|
||||||
|
assert opensearch.opensearch_domains[domain_arn].dedicated_master_count == 1
|
||||||
|
assert opensearch.opensearch_domains[domain_arn].tags == [
|
||||||
{"Key": "test", "Value": "test"},
|
{"Key": "test", "Value": "test"},
|
||||||
]
|
]
|
||||||
|
|||||||
Reference in New Issue
Block a user