mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-31 05:06:57 +00:00
Compare commits
5 Commits
5.5.1
...
PRWLR-4969
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
185d6c482d | ||
|
|
04993f4e76 | ||
|
|
2bd9f73c1b | ||
|
|
0d743fa179 | ||
|
|
eca94e2e3b |
@@ -13,142 +13,145 @@ class OpenSearchService(AWSService):
|
||||
def __init__(self, provider):
|
||||
# Call AWSService's __init__
|
||||
super().__init__("opensearch", provider)
|
||||
self.opensearch_domains = []
|
||||
self.opensearch_domains = {}
|
||||
self.__threading_call__(self._list_domain_names)
|
||||
self._describe_domain_config(self.regional_clients)
|
||||
self._describe_domain(self.regional_clients)
|
||||
self._list_tags()
|
||||
self.__threading_call__(
|
||||
self._describe_domain_config, self.opensearch_domains.values()
|
||||
)
|
||||
self.__threading_call__(self._describe_domain, self.opensearch_domains.values())
|
||||
self.__threading_call__(self._list_tags, self.opensearch_domains.values())
|
||||
|
||||
def _list_domain_names(self, regional_client):
|
||||
logger.info("OpenSearch - listing domain names...")
|
||||
try:
|
||||
domains = regional_client.list_domain_names()
|
||||
for domain in domains["DomainNames"]:
|
||||
arn = f"arn:{self.audited_partition}:opensearch:{regional_client.region}:{self.audited_account}:domain/{domain['DomainName']}"
|
||||
arn = f"arn:{self.audited_partition}:es:{regional_client.region}:{self.audited_account}:domain/{domain['DomainName']}"
|
||||
if not self.audit_resources or (
|
||||
is_resource_filtered(arn, self.audit_resources)
|
||||
):
|
||||
self.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
arn=arn,
|
||||
name=domain["DomainName"],
|
||||
region=regional_client.region,
|
||||
)
|
||||
self.opensearch_domains[arn] = OpenSearchDomain(
|
||||
arn=arn,
|
||||
name=domain["DomainName"],
|
||||
region=regional_client.region,
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _describe_domain_config(self, regional_clients):
|
||||
def _describe_domain_config(self, domain):
|
||||
logger.info("OpenSearch - describing domain configurations...")
|
||||
try:
|
||||
for domain in self.opensearch_domains:
|
||||
regional_client = regional_clients[domain.region]
|
||||
describe_domain = regional_client.describe_domain_config(
|
||||
DomainName=domain.name
|
||||
)
|
||||
for logging_key in [
|
||||
"SEARCH_SLOW_LOGS",
|
||||
"INDEX_SLOW_LOGS",
|
||||
"AUDIT_LOGS",
|
||||
]:
|
||||
if logging_key in describe_domain["DomainConfig"].get(
|
||||
"LogPublishingOptions", {}
|
||||
).get("Options", {}):
|
||||
domain.logging.append(
|
||||
PublishingLoggingOption(
|
||||
name=logging_key,
|
||||
enabled=describe_domain["DomainConfig"][
|
||||
"LogPublishingOptions"
|
||||
]["Options"][logging_key]["Enabled"],
|
||||
)
|
||||
regional_client = self.regional_clients[domain.region]
|
||||
describe_domain = regional_client.describe_domain_config(
|
||||
DomainName=domain.name
|
||||
)
|
||||
for logging_key in [
|
||||
"SEARCH_SLOW_LOGS",
|
||||
"INDEX_SLOW_LOGS",
|
||||
"AUDIT_LOGS",
|
||||
]:
|
||||
if logging_key in describe_domain["DomainConfig"].get(
|
||||
"LogPublishingOptions", {}
|
||||
).get("Options", {}):
|
||||
domain.logging.append(
|
||||
PublishingLoggingOption(
|
||||
name=logging_key,
|
||||
enabled=describe_domain["DomainConfig"][
|
||||
"LogPublishingOptions"
|
||||
]["Options"][logging_key]["Enabled"],
|
||||
)
|
||||
try:
|
||||
domain.access_policy = loads(
|
||||
describe_domain["DomainConfig"]["AccessPolicies"]["Options"]
|
||||
)
|
||||
except JSONDecodeError as error:
|
||||
logger.warning(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
continue
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _describe_domain(self, regional_clients):
|
||||
logger.info("OpenSearch - describing domain configurations...")
|
||||
try:
|
||||
for domain in self.opensearch_domains:
|
||||
regional_client = regional_clients[domain.region]
|
||||
describe_domain = regional_client.describe_domain(
|
||||
DomainName=domain.name
|
||||
)
|
||||
domain.arn = describe_domain["DomainStatus"]["ARN"]
|
||||
domain.vpc_endpoints = None
|
||||
if "Endpoints" in describe_domain["DomainStatus"]:
|
||||
if "vpc" in describe_domain["DomainStatus"]["Endpoints"]:
|
||||
domain.vpc_endpoints = [
|
||||
vpc
|
||||
for vpc in describe_domain["DomainStatus"][
|
||||
"Endpoints"
|
||||
].values()
|
||||
]
|
||||
|
||||
domain.vpc_id = None
|
||||
if "VPCOptions" in describe_domain["DomainStatus"]:
|
||||
domain.vpc_id = describe_domain["DomainStatus"]["VPCOptions"][
|
||||
"VPCId"
|
||||
]
|
||||
domain.cognito_options = describe_domain["DomainStatus"][
|
||||
"CognitoOptions"
|
||||
]["Enabled"]
|
||||
domain.encryption_at_rest = describe_domain["DomainStatus"][
|
||||
"EncryptionAtRestOptions"
|
||||
]["Enabled"]
|
||||
domain.node_to_node_encryption = describe_domain["DomainStatus"][
|
||||
"NodeToNodeEncryptionOptions"
|
||||
]["Enabled"]
|
||||
domain.enforce_https = describe_domain["DomainStatus"][
|
||||
"DomainEndpointOptions"
|
||||
]["EnforceHTTPS"]
|
||||
domain.internal_user_database = describe_domain["DomainStatus"][
|
||||
"AdvancedSecurityOptions"
|
||||
]["InternalUserDatabaseEnabled"]
|
||||
domain.saml_enabled = (
|
||||
describe_domain["DomainStatus"]["AdvancedSecurityOptions"]
|
||||
.get("SAMLOptions", {})
|
||||
.get("Enabled", False)
|
||||
)
|
||||
domain.update_available = describe_domain["DomainStatus"][
|
||||
"ServiceSoftwareOptions"
|
||||
]["UpdateAvailable"]
|
||||
domain.version = describe_domain["DomainStatus"]["EngineVersion"]
|
||||
domain.advanced_settings_enabled = describe_domain["DomainStatus"][
|
||||
"AdvancedSecurityOptions"
|
||||
]["Enabled"]
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _list_tags(self):
|
||||
logger.info("OpenSearch - List Tags...")
|
||||
for domain in self.opensearch_domains:
|
||||
try:
|
||||
regional_client = self.regional_clients[domain.region]
|
||||
response = regional_client.list_tags(
|
||||
ARN=domain.arn,
|
||||
)["TagList"]
|
||||
domain.tags = response
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
domain.access_policy = loads(
|
||||
describe_domain["DomainConfig"]["AccessPolicies"]["Options"]
|
||||
)
|
||||
except JSONDecodeError as error:
|
||||
logger.warning(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _describe_domain(self, domain):
|
||||
logger.info("OpenSearch - describing domain configurations...")
|
||||
try:
|
||||
regional_client = self.regional_clients[domain.region]
|
||||
describe_domain = regional_client.describe_domain(DomainName=domain.name)
|
||||
domain.arn = describe_domain["DomainStatus"]["ARN"]
|
||||
domain.vpc_endpoints = None
|
||||
if "Endpoints" in describe_domain["DomainStatus"]:
|
||||
if "vpc" in describe_domain["DomainStatus"]["Endpoints"]:
|
||||
domain.vpc_endpoints = [
|
||||
vpc
|
||||
for vpc in describe_domain["DomainStatus"]["Endpoints"].values()
|
||||
]
|
||||
|
||||
domain.vpc_id = None
|
||||
if "VPCOptions" in describe_domain["DomainStatus"]:
|
||||
domain.vpc_id = describe_domain["DomainStatus"]["VPCOptions"].get(
|
||||
"VPCId", None
|
||||
)
|
||||
domain.cognito_options = describe_domain["DomainStatus"][
|
||||
"CognitoOptions"
|
||||
].get("Enabled", False)
|
||||
domain.encryption_at_rest = describe_domain["DomainStatus"][
|
||||
"EncryptionAtRestOptions"
|
||||
].get("Enabled", False)
|
||||
domain.node_to_node_encryption = describe_domain["DomainStatus"][
|
||||
"NodeToNodeEncryptionOptions"
|
||||
].get("Enabled", False)
|
||||
domain.enforce_https = describe_domain["DomainStatus"][
|
||||
"DomainEndpointOptions"
|
||||
].get("EnforceHTTPS", False)
|
||||
domain.internal_user_database = describe_domain["DomainStatus"][
|
||||
"AdvancedSecurityOptions"
|
||||
].get("InternalUserDatabaseEnabled", False)
|
||||
domain.saml_enabled = (
|
||||
describe_domain["DomainStatus"]["AdvancedSecurityOptions"]
|
||||
.get("SAMLOptions", {})
|
||||
.get("Enabled", False)
|
||||
)
|
||||
domain.update_available = (
|
||||
describe_domain["DomainStatus"]
|
||||
.get("ServiceSoftwareOptions", {"UpdateAvailable": False})
|
||||
.get("UpdateAvailable", False)
|
||||
)
|
||||
domain.version = describe_domain["DomainStatus"].get("EngineVersion", None)
|
||||
domain.advanced_settings_enabled = describe_domain["DomainStatus"][
|
||||
"AdvancedSecurityOptions"
|
||||
].get("Enabled", False)
|
||||
domain.instance_count = describe_domain["DomainStatus"][
|
||||
"ClusterConfig"
|
||||
].get("InstanceCount", None)
|
||||
domain.zone_awareness_enabled = describe_domain["DomainStatus"][
|
||||
"ClusterConfig"
|
||||
].get("ZoneAwarenessEnabled", False)
|
||||
domain.dedicated_master_count = describe_domain["DomainStatus"][
|
||||
"ClusterConfig"
|
||||
].get("DedicatedMasterCount", None)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _list_tags(self, domain):
|
||||
logger.info("OpenSearch - List Tags...")
|
||||
try:
|
||||
regional_client = self.regional_clients[domain.region]
|
||||
response = regional_client.list_tags(
|
||||
ARN=domain.arn,
|
||||
)["TagList"]
|
||||
domain.tags = response
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
class PublishingLoggingOption(BaseModel):
|
||||
name: str
|
||||
@@ -171,5 +174,8 @@ class OpenSearchDomain(BaseModel):
|
||||
saml_enabled: bool = None
|
||||
update_available: bool = None
|
||||
version: str = None
|
||||
instance_count: Optional[int]
|
||||
zone_awareness_enabled: Optional[bool]
|
||||
dedicated_master_count: Optional[int]
|
||||
tags: Optional[list] = []
|
||||
advanced_settings_enabled: bool = None
|
||||
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_access_control_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "opensearch_service_domains_at_least_three_master_nodes",
|
||||
"CheckTitle": "Elasticsearch domains should be configured with at least three dedicated master nodes",
|
||||
"CheckType": [
|
||||
"Software and Configuration Checks/AWS Security Best Practices"
|
||||
],
|
||||
"ServiceName": "elasticsearch",
|
||||
"SubServiceName": "domain",
|
||||
"ResourceIdTemplate": "arn:aws:es:{region}:{account-id}:domain/{domain-name}",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "AwsElasticsearchDomain",
|
||||
"Description": "This control checks whether Elasticsearch domains are configured with at least three dedicated primary nodes. This control fails if the domain does not use dedicated primary nodes. Using more than three primary nodes may not provide significant additional availability benefits, while incurring extra costs.",
|
||||
"Risk": "Without at least three dedicated master nodes, the Elasticsearch domain's fault-tolerance and ability to handle cluster management operations during node failures may be compromised, leading to potential data unavailability.",
|
||||
"RelatedUrl": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/what-is.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws opensearch update-domain-config --domain-name <domain-name> --cluster-config DedicatedMasterEnabled=true,MasterInstanceCount=3",
|
||||
"NativeIaC": "",
|
||||
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/es-controls.html#es-7",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Configure Elasticsearch domains with at least three dedicated master nodes for high availability and cluster fault tolerance.",
|
||||
"Url": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-configuration-changes.html"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"redundancy"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
opensearch_client,
|
||||
)
|
||||
|
||||
|
||||
class opensearch_service_domains_at_least_three_master_nodes(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
report.resource_arn = domain.arn
|
||||
report.resource_tags = domain.tags
|
||||
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Opensearch domain {domain.name} has only {domain.dedicated_master_count} master nodes."
|
||||
|
||||
if domain.dedicated_master_count >= 3:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"Opensearch domain {domain.name} has {domain.dedicated_master_count} master nodes."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled",
|
||||
"CheckTitle": "Elasticsearch/Opensearch domains should have at least three data nodes",
|
||||
"CheckType": [
|
||||
"Software and Configuration Checks/AWS Security Best Practices"
|
||||
],
|
||||
"ServiceName": "opensearch",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:service:{region}:{account-id}:domain/{domain-name}",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "AwsElasticsearchDomain",
|
||||
"Description": "This control checks whether Elasticsearch/Opensearch domains are configured with at least three data nodes and zoneAwarenessEnabled is true.",
|
||||
"Risk": "Without at least three data nodes, the Elasticsearch/Opensearch domain may not be fault-tolerant, leading to a higher risk of data loss or unavailability in case of node failure.",
|
||||
"RelatedUrl": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/what-is.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws opensearch update-domain-config --domain-name <domain-name> --cluster-config InstanceCount=3,ZoneAwarenessEnabled=true",
|
||||
"NativeIaC": "",
|
||||
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/es-controls.html#es-6",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Modify the Elasticsearch/Opensearch domain to ensure at least three data nodes and enable zone awareness for high availability.",
|
||||
"Url": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-configuration-changes.html"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"redundancy"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
opensearch_client,
|
||||
)
|
||||
|
||||
|
||||
class opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
report.resource_arn = domain.arn
|
||||
report.resource_tags = domain.tags
|
||||
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Opensearch domain {domain.name} does not meet the requirements: it has less than 3 data nodes and zone awareness is not enabled."
|
||||
|
||||
if domain.instance_count >= 3 and domain.zone_awareness_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"Opensearch domain {domain.name} has {domain.instance_count} data nodes and zone awareness enabled."
|
||||
elif domain.instance_count >= 3 and not domain.zone_awareness_enabled:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Opensearch domain {domain.name} has {domain.instance_count} data nodes, but zone awareness is not enabled."
|
||||
elif domain.instance_count < 3 and domain.zone_awareness_enabled:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Opensearch domain {domain.name} has zone awareness enabled, but only {domain.instance_count} data nodes."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_audit_logging_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_cloudwatch_logging_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_encryption_at_rest_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_https_communications_enforced(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_internal_user_database_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_node_to_node_encryption_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_not_publicly_accessible(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -7,14 +7,14 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_updated_to_the_latest_service_software_version(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
report.resource_arn = domain.arn
|
||||
report.resource_tags = domain.tags
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} does not have internal updates available."
|
||||
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} does not have internal updates available."
|
||||
if domain.update_available:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Opensearch domain {domain.name} with version {domain.version} has internal updates available."
|
||||
|
||||
@@ -7,7 +7,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
|
||||
class opensearch_service_domains_use_cognito_authentication_for_kibana(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for domain in opensearch_client.opensearch_domains:
|
||||
for domain in opensearch_client.opensearch_domains.values():
|
||||
report = Check_Report_AWS(self.metadata())
|
||||
report.region = domain.region
|
||||
report.resource_id = domain.name
|
||||
|
||||
@@ -1,26 +1,32 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_access_control_enabled:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
||||
opensearch_service_domains_access_control_enabled,
|
||||
@@ -30,22 +36,26 @@ class Test_opensearch_service_domains_access_control_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_no_fine_grained_access_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain",
|
||||
AdvancedSecurityOptions={"Enabled": False},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].advanced_settings_enabled = False
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
||||
opensearch_service_domains_access_control_enabled,
|
||||
@@ -55,29 +65,36 @@ class Test_opensearch_service_domains_access_control_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"does not have fine grained access control enabled.",
|
||||
result[0].status_extended,
|
||||
assert (
|
||||
f"Opensearch domain {domain["DomainStatus"]["DomainName"]} does not have fine grained access control enabled."
|
||||
== result[0].status_extended
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
def test_logging_AUDIT_LOGS_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
||||
)
|
||||
@mock_aws
|
||||
def test_fine_grained_access_enabled(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain",
|
||||
AdvancedSecurityOptions={"Enabled": True},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].advanced_settings_enabled = True
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_access_control_enabled.opensearch_service_domains_access_control_enabled import (
|
||||
opensearch_service_domains_access_control_enabled,
|
||||
@@ -87,8 +104,12 @@ class Test_opensearch_service_domains_access_control_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"has fine grained access control enabled.", result[0].status_extended
|
||||
assert (
|
||||
f"Opensearch domain {domain["DomainStatus"]["DomainName"]} has fine grained access control enabled."
|
||||
== result[0].status_extended
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@@ -0,0 +1,115 @@
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_at_least_three_master_nodes:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
|
||||
opensearch_service_domains_at_least_three_master_nodes,
|
||||
)
|
||||
|
||||
check = opensearch_service_domains_at_least_three_master_nodes()
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_less_than_three_master_nodes(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-2-nodes",
|
||||
ClusterConfig={"DedicatedMasterCount": 2},
|
||||
)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
|
||||
opensearch_service_domains_at_least_three_master_nodes,
|
||||
)
|
||||
|
||||
check = opensearch_service_domains_at_least_three_master_nodes()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-2-nodes has only 2 master nodes."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_at_least_three_master_nodes(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-3-nodes",
|
||||
ClusterConfig={"DedicatedMasterCount": 3},
|
||||
)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
|
||||
opensearch_service_domains_at_least_three_master_nodes,
|
||||
)
|
||||
|
||||
check = opensearch_service_domains_at_least_three_master_nodes()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-3-nodes has 3 master nodes."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
@@ -0,0 +1,203 @@
|
||||
from unittest import mock
|
||||
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||
)
|
||||
|
||||
check = (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_instance_count_less_than_three_zoneawareness_enabled(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain",
|
||||
ClusterConfig={"ZoneAwarenessEnabled": True, "InstanceCount": 2},
|
||||
)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||
)
|
||||
|
||||
check = (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain has zone awareness enabled, but only 2 data nodes."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_instance_count_more_than_three_zoneawareness_not_enabled(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain",
|
||||
ClusterConfig={"ZoneAwarenessEnabled": False, "InstanceCount": 3},
|
||||
)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||
)
|
||||
|
||||
check = (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain has 3 data nodes, but zone awareness is not enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_instance_count_less_than_three_zoneawareness_not_enabled(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain",
|
||||
ClusterConfig={"ZoneAwarenessEnabled": False, "InstanceCount": 2},
|
||||
)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||
)
|
||||
|
||||
check = (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain does not meet the requirements: it has less than 3 data nodes and zone awareness is not enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_logging_instance_count_more_than_three_zoneawareness_enabled(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain",
|
||||
ClusterConfig={"ZoneAwarenessEnabled": True, "InstanceCount": 3},
|
||||
)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled.opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled import (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled,
|
||||
)
|
||||
|
||||
check = (
|
||||
opensearch_service_domains_at_least_three_nodes_and_zoneawareness_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain has 3 data nodes and zone awareness enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
@@ -1,27 +1,32 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
PublishingLoggingOption,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_audit_logging_enabled:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
||||
opensearch_service_domains_audit_logging_enabled,
|
||||
@@ -31,22 +36,29 @@ class Test_opensearch_service_domains_audit_logging_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_no_logging_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-logging",
|
||||
LogPublishingOptions={
|
||||
"SEARCH_SLOW_LOGS": {
|
||||
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*"
|
||||
},
|
||||
},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
||||
opensearch_service_domains_audit_logging_enabled,
|
||||
@@ -56,29 +68,40 @@ class Test_opensearch_service_domains_audit_logging_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search("AUDIT_LOGS disabled", result[0].status_extended)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
def test_logging_AUDIT_LOGS_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-no-logging AUDIT_LOGS disabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_logging_AUDIT_LOGS_enabled(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-logging",
|
||||
LogPublishingOptions={
|
||||
"AUDIT_LOGS": {
|
||||
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:audit-logs:*",
|
||||
"Enabled": True,
|
||||
},
|
||||
},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
opensearch_client.opensearch_domains[0].logging.append(
|
||||
PublishingLoggingOption(name="AUDIT_LOGS", enabled=True)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_audit_logging_enabled.opensearch_service_domains_audit_logging_enabled import (
|
||||
opensearch_service_domains_audit_logging_enabled,
|
||||
@@ -88,6 +111,12 @@ class Test_opensearch_service_domains_audit_logging_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search("AUDIT_LOGS enabled", result[0].status_extended)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-logging AUDIT_LOGS enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@@ -1,27 +1,32 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
PublishingLoggingOption,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||
@@ -31,22 +36,30 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_no_logging_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-logging",
|
||||
LogPublishingOptions={
|
||||
"AUDIT_LOGS": {
|
||||
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||
"Enabled": True,
|
||||
},
|
||||
},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||
@@ -56,32 +69,40 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS disabled",
|
||||
result[0].status_extended,
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-no-logging SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS disabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@mock_aws
|
||||
def test_logging_SEARCH_SLOW_LOGS_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-index-logging",
|
||||
LogPublishingOptions={
|
||||
"SEARCH_SLOW_LOGS": {
|
||||
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||
"Enabled": True,
|
||||
},
|
||||
},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
opensearch_client.opensearch_domains[0].logging.append(
|
||||
PublishingLoggingOption(name="SEARCH_SLOW_LOGS", enabled=True)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||
@@ -91,32 +112,40 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"SEARCH_SLOW_LOGS enabled but INDEX_SLOW_LOGS disabled",
|
||||
result[0].status_extended,
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-no-index-logging SEARCH_SLOW_LOGS enabled but INDEX_SLOW_LOGS disabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@mock_aws
|
||||
def test_logging_INDEX_SLOW_LOGS_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-search-logging",
|
||||
LogPublishingOptions={
|
||||
"INDEX_SLOW_LOGS": {
|
||||
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||
"Enabled": True,
|
||||
},
|
||||
},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
opensearch_client.opensearch_domains[0].logging.append(
|
||||
PublishingLoggingOption(name="INDEX_SLOW_LOGS", enabled=True)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||
@@ -126,34 +155,44 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"INDEX_SLOW_LOGS enabled but SEARCH_SLOW_LOGS disabled",
|
||||
result[0].status_extended,
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-no-search-logging INDEX_SLOW_LOGS enabled but SEARCH_SLOW_LOGS disabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@mock_aws
|
||||
def test_logging_INDEX_SLOW_LOGS_and_SEARCH_SLOW_LOGS_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name, region=AWS_REGION_EU_WEST_1, arn=domain_arn
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-logging",
|
||||
LogPublishingOptions={
|
||||
"SEARCH_SLOW_LOGS": {
|
||||
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||
"Enabled": True,
|
||||
},
|
||||
"INDEX_SLOW_LOGS": {
|
||||
"CloudWatchLogsLogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:search-logs:*",
|
||||
"Enabled": True,
|
||||
},
|
||||
},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
logging_options = [
|
||||
PublishingLoggingOption(name="INDEX_SLOW_LOGS", enabled=True),
|
||||
PublishingLoggingOption(name="SEARCH_SLOW_LOGS", enabled=True),
|
||||
]
|
||||
opensearch_client.opensearch_domains[0].logging.extend(logging_options)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_cloudwatch_logging_enabled.opensearch_service_domains_cloudwatch_logging_enabled import (
|
||||
opensearch_service_domains_cloudwatch_logging_enabled,
|
||||
@@ -163,9 +202,12 @@ class Test_opensearch_service_domains_cloudwatch_logging_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS enabled",
|
||||
result[0].status_extended,
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-logging SEARCH_SLOW_LOGS and INDEX_SLOW_LOGS enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@@ -1,26 +1,32 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
||||
opensearch_service_domains_encryption_at_rest_enabled,
|
||||
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_no_encryption_at_rest_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
encryption_at_rest=False,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-encryption",
|
||||
EncryptionAtRestOptions={"Enabled": False},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
||||
opensearch_service_domains_encryption_at_rest_enabled,
|
||||
@@ -58,31 +64,35 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"does not have encryption at-rest enabled", result[0].status_extended
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-no-encryption does not have encryption at-rest enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@mock_aws
|
||||
def test_encryption_at_rest_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
encryption_at_rest=True,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-encryption-at-rest",
|
||||
EncryptionAtRestOptions={"Enabled": True},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_encryption_at_rest_enabled.opensearch_service_domains_encryption_at_rest_enabled import (
|
||||
opensearch_service_domains_encryption_at_rest_enabled,
|
||||
@@ -92,6 +102,12 @@ class Test_opensearch_service_domains_encryption_at_rest_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search("has encryption at-rest enabled", result[0].status_extended)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-encryption-at-rest has encryption at-rest enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@@ -1,26 +1,32 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_https_communications_enforced:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
||||
opensearch_service_domains_https_communications_enforced,
|
||||
@@ -30,25 +36,27 @@ class Test_opensearch_service_domains_https_communications_enforced:
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_no_https_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
enforce_https=False,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-https",
|
||||
DomainEndpointOptions={
|
||||
"EnforceHTTPS": False,
|
||||
},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
||||
opensearch_service_domains_https_communications_enforced,
|
||||
@@ -58,31 +66,37 @@ class Test_opensearch_service_domains_https_communications_enforced:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"does not have enforce HTTPS enabled", result[0].status_extended
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-no-https does not have enforce HTTPS enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@mock_aws
|
||||
def test_https_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
enforce_https=True,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-https",
|
||||
DomainEndpointOptions={
|
||||
"EnforceHTTPS": True,
|
||||
},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_https_communications_enforced.opensearch_service_domains_https_communications_enforced import (
|
||||
opensearch_service_domains_https_communications_enforced,
|
||||
@@ -92,6 +106,12 @@ class Test_opensearch_service_domains_https_communications_enforced:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search("has enforce HTTPS enabled", result[0].status_extended)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-https has enforce HTTPS enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@@ -1,26 +1,32 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_internal_user_database_enabled:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
||||
opensearch_service_domains_internal_user_database_enabled,
|
||||
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_internal_database_disabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
internal_user_database=False,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain",
|
||||
AdvancedSecurityOptions={"InternalUserDatabaseEnabled": False},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
||||
opensearch_service_domains_internal_user_database_enabled,
|
||||
@@ -58,32 +64,35 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"does not have internal user database enabled",
|
||||
result[0].status_extended,
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain does not have internal user database enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@mock_aws
|
||||
def test_internal_database_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
internal_user_database=True,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain",
|
||||
AdvancedSecurityOptions={"InternalUserDatabaseEnabled": True},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_internal_user_database_enabled.opensearch_service_domains_internal_user_database_enabled import (
|
||||
opensearch_service_domains_internal_user_database_enabled,
|
||||
@@ -93,8 +102,12 @@ class Test_opensearch_service_domains_internal_user_database_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"has internal user database enabled", result[0].status_extended
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain has internal user database enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@@ -1,26 +1,32 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
||||
opensearch_service_domains_node_to_node_encryption_enabled,
|
||||
@@ -30,25 +36,25 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_no_encryption_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
node_to_node_encryption=False,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-encryption",
|
||||
NodeToNodeEncryptionOptions={"Enabled": False},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
||||
opensearch_service_domains_node_to_node_encryption_enabled,
|
||||
@@ -58,32 +64,35 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
"does not have node-to-node encryption enabled",
|
||||
result[0].status_extended,
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-no-encryption does not have node-to-node encryption enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@mock_aws
|
||||
def test_encryption_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
node_to_node_encryption=True,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-encryption",
|
||||
NodeToNodeEncryptionOptions={"Enabled": True},
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_node_to_node_encryption_enabled.opensearch_service_domains_node_to_node_encryption_enabled import (
|
||||
opensearch_service_domains_node_to_node_encryption_enabled,
|
||||
@@ -93,8 +102,12 @@ class Test_opensearch_service_domains_node_to_node_encryption_enabled:
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"has node-to-node encryption enabled", result[0].status_extended
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-encryption has node-to-node encryption enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@@ -306,7 +306,7 @@ class Test_opensearch_service_domains_not_publicly_accessible:
|
||||
|
||||
def test_domain_inside_vpc(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains = {}
|
||||
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
|
||||
|
||||
@@ -324,13 +324,12 @@ class Test_opensearch_service_domains_not_publicly_accessible:
|
||||
opensearch_service_domains_not_publicly_accessible,
|
||||
)
|
||||
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_US_WEST_2,
|
||||
arn=f"arn:aws:es:{AWS_REGION_US_WEST_2}:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}",
|
||||
vpc_id="vpc-123456",
|
||||
)
|
||||
domain_arn = f"arn:aws:es:{AWS_REGION_US_WEST_2}:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
opensearch_client.opensearch_domains[domain_arn] = OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_US_WEST_2,
|
||||
arn=domain_arn,
|
||||
vpc_id="vpc-123456",
|
||||
)
|
||||
|
||||
check = opensearch_service_domains_not_publicly_accessible()
|
||||
|
||||
@@ -1,26 +1,76 @@
|
||||
from re import search
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
import botocore
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
make_api_call = botocore.client.BaseClient._make_api_call
|
||||
|
||||
|
||||
def mock_make_api_call(self, operation_name, kwarg):
|
||||
if operation_name == "ListDomainNames":
|
||||
return {
|
||||
"DomainNames": [
|
||||
{
|
||||
"DomainName": "test-domain-updates",
|
||||
},
|
||||
]
|
||||
}
|
||||
if operation_name == "DescribeDomain":
|
||||
return {
|
||||
"DomainStatus": {
|
||||
"DomainName": "test-domain-updates",
|
||||
"EngineVersion": "OpenSearch2.0",
|
||||
"ServiceSoftwareOptions": {
|
||||
"UpdateAvailable": True,
|
||||
},
|
||||
"ARN": f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/test-domain-updates",
|
||||
"ClusterConfig": {
|
||||
"InstanceCount": 1,
|
||||
},
|
||||
"AdvancedSecurityOptions": {
|
||||
"InternalUserDatabaseEnabled": False,
|
||||
},
|
||||
"CognitoOptions": {
|
||||
"Enabled": False,
|
||||
},
|
||||
"EncryptionAtRestOptions": {
|
||||
"Enabled": False,
|
||||
},
|
||||
"NodeToNodeEncryptionOptions": {
|
||||
"Enabled": False,
|
||||
},
|
||||
"DomainEndpointOptions": {
|
||||
"EnforceHTTPS": False,
|
||||
},
|
||||
}
|
||||
}
|
||||
return make_api_call(self, operation_name, kwarg)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_updated_to_the_latest_service_software_version:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
||||
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
||||
@@ -32,61 +82,22 @@ class Test_opensearch_service_domains_updated_to_the_latest_service_software_ver
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
def test_internal_update_available(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
update_available=False,
|
||||
)
|
||||
@mock_aws
|
||||
@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
def test_updates_available(self):
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
||||
new=opensearch_client,
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
||||
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
||||
)
|
||||
|
||||
check = (
|
||||
opensearch_service_domains_updated_to_the_latest_service_software_version()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"does not have internal updates available", result[0].status_extended
|
||||
)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
def test_internal_database_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
update_available=True,
|
||||
)
|
||||
)
|
||||
opensearch_client.opensearch_domains[0].logging = []
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
new=opensearch_client,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
||||
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
||||
@@ -98,6 +109,52 @@ class Test_opensearch_service_domains_updated_to_the_latest_service_software_ver
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search("has internal updates available", result[0].status_extended)
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "Opensearch domain test-domain-updates with version OpenSearch2.0 has internal updates available."
|
||||
)
|
||||
assert result[0].resource_id == "test-domain-updates"
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/test-domain-updates"
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_no_updates_availables(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-updates",
|
||||
SoftwareUpdateOptions={"AutoSoftwareUpdateEnabled": True},
|
||||
)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_client",
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_updated_to_the_latest_service_software_version.opensearch_service_domains_updated_to_the_latest_service_software_version import (
|
||||
opensearch_service_domains_updated_to_the_latest_service_software_version,
|
||||
)
|
||||
|
||||
check = (
|
||||
opensearch_service_domains_updated_to_the_latest_service_software_version()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Opensearch domain test-domain-no-updates with version {domain["DomainStatus"]["EngineVersion"]} does not have internal updates available."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
|
||||
@@ -1,25 +1,32 @@
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchDomain,
|
||||
)
|
||||
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
domain_name = "test-domain"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}"
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_REGION_US_EAST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
||||
@mock_aws
|
||||
def test_no_domains(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
||||
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
||||
@@ -29,25 +36,24 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_neither_cognito_nor_saml_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
cognito_options=False,
|
||||
saml_enabled=False,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-no-cognito-no-saml",
|
||||
)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
||||
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
||||
@@ -59,31 +65,33 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Opensearch domain {domain_name} has neither Amazon Cognito nor SAML authentication for Kibana enabled."
|
||||
== "Opensearch domain test-domain-no-cognito-no-saml has neither Amazon Cognito nor SAML authentication for Kibana enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].region == AWS_REGION_EU_WEST_1
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
assert result[0].resource_tags == []
|
||||
|
||||
@mock_aws
|
||||
def test_cognito_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
cognito_options=True,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-cognito",
|
||||
CognitoOptions={"Enabled": True},
|
||||
)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
||||
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
||||
@@ -95,32 +103,33 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Opensearch domain {domain_name} has either Amazon Cognito or SAML authentication for Kibana enabled."
|
||||
== "Opensearch domain test-domain-cognito has either Amazon Cognito or SAML authentication for Kibana enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].region == AWS_REGION_EU_WEST_1
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
assert result[0].resource_tags == []
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@mock_aws
|
||||
def test_saml_enabled(self):
|
||||
opensearch_client = mock.MagicMock
|
||||
opensearch_client.opensearch_domains = []
|
||||
opensearch_client.opensearch_domains.append(
|
||||
OpenSearchDomain(
|
||||
name=domain_name,
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
arn=domain_arn,
|
||||
saml_enabled=True,
|
||||
)
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName="test-domain-saml",
|
||||
AdvancedSecurityOptions={"SAMLOptions": {"Enabled": True}},
|
||||
)
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
)
|
||||
|
||||
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service.OpenSearchService",
|
||||
opensearch_client,
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mocked_aws_provider,
|
||||
), mock.patch(
|
||||
"prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_client",
|
||||
new=opensearch_client,
|
||||
new=OpenSearchService(mocked_aws_provider),
|
||||
):
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service_domains_use_cognito_authentication_for_kibana.opensearch_service_domains_use_cognito_authentication_for_kibana import (
|
||||
opensearch_service_domains_use_cognito_authentication_for_kibana,
|
||||
@@ -132,10 +141,10 @@ class Test_opensearch_service_domains_use_cognito_authentication_for_kibana:
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Opensearch domain {domain_name} has either Amazon Cognito or SAML authentication for Kibana enabled."
|
||||
== "Opensearch domain test-domain-saml has either Amazon Cognito or SAML authentication for Kibana enabled."
|
||||
)
|
||||
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
|
||||
)
|
||||
assert result[0].region == AWS_REGION_EU_WEST_1
|
||||
assert result[0].resource_id == domain_name
|
||||
assert result[0].resource_arn == domain_arn
|
||||
assert result[0].resource_tags == []
|
||||
assert result[0].resource_arn == domain_arn
|
||||
|
||||
@@ -2,6 +2,8 @@ from json import dumps
|
||||
from unittest.mock import patch
|
||||
|
||||
import botocore
|
||||
from boto3 import client
|
||||
from moto import mock_aws
|
||||
|
||||
from prowler.providers.aws.services.opensearch.opensearch_service import (
|
||||
OpenSearchService,
|
||||
@@ -13,7 +15,7 @@ from tests.providers.aws.utils import (
|
||||
)
|
||||
|
||||
test_domain_name = "test"
|
||||
domain_arn = f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{test_domain_name}"
|
||||
domain_arn = f"arn:aws:es:eu-west-1:{AWS_ACCOUNT_NUMBER}:domain/{test_domain_name}"
|
||||
|
||||
policy_data = {
|
||||
"Version": "2012-10-17",
|
||||
@@ -56,35 +58,6 @@ def mock_make_api_call(self, operation_name, kwarg):
|
||||
},
|
||||
}
|
||||
}
|
||||
if operation_name == "DescribeDomain":
|
||||
return {
|
||||
"DomainStatus": {
|
||||
"ARN": domain_arn,
|
||||
"Endpoints": {
|
||||
"vpc": "vpc-endpoint-h2dsd34efgyghrtguk5gt6j2foh4.us-east-1.es.amazonaws.com"
|
||||
},
|
||||
"EngineVersion": "opensearch-version1",
|
||||
"VPCOptions": {
|
||||
"VPCId": "test-vpc-id",
|
||||
},
|
||||
"CognitoOptions": {"Enabled": True},
|
||||
"EncryptionAtRestOptions": {"Enabled": True},
|
||||
"NodeToNodeEncryptionOptions": {"Enabled": True},
|
||||
"AdvancedOptions": {"string": "string"},
|
||||
"LogPublishingOptions": {
|
||||
"string": {
|
||||
"CloudWatchLogsLogGroupArn": "string",
|
||||
"Enabled": True | False,
|
||||
}
|
||||
},
|
||||
"ServiceSoftwareOptions": {"UpdateAvailable": True},
|
||||
"DomainEndpointOptions": {"EnforceHTTPS": True},
|
||||
"AdvancedSecurityOptions": {
|
||||
"InternalUserDatabaseEnabled": True,
|
||||
"SAMLOptions": {"Enabled": True},
|
||||
},
|
||||
}
|
||||
}
|
||||
if operation_name == "ListTags":
|
||||
return {
|
||||
"TagList": [
|
||||
@@ -132,45 +105,85 @@ class Test_OpenSearchService_Service:
|
||||
aws_provider = set_mocked_aws_provider([])
|
||||
opensearch = OpenSearchService(aws_provider)
|
||||
assert len(opensearch.opensearch_domains) == 1
|
||||
assert opensearch.opensearch_domains[0].name == test_domain_name
|
||||
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1
|
||||
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
|
||||
assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
|
||||
|
||||
# Test OpenSearchService describ domain config
|
||||
def test_describe_domain_config(self):
|
||||
aws_provider = set_mocked_aws_provider([])
|
||||
opensearch = OpenSearchService(aws_provider)
|
||||
assert len(opensearch.opensearch_domains) == 1
|
||||
assert opensearch.opensearch_domains[0].name == test_domain_name
|
||||
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1
|
||||
assert opensearch.opensearch_domains[0].access_policy
|
||||
assert opensearch.opensearch_domains[0].logging[0].name == "SEARCH_SLOW_LOGS"
|
||||
assert opensearch.opensearch_domains[0].logging[0].enabled
|
||||
assert opensearch.opensearch_domains[0].logging[1].name == "INDEX_SLOW_LOGS"
|
||||
assert opensearch.opensearch_domains[0].logging[1].enabled
|
||||
assert opensearch.opensearch_domains[0].logging[2].name == "AUDIT_LOGS"
|
||||
assert opensearch.opensearch_domains[0].logging[2].enabled
|
||||
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
|
||||
assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
|
||||
assert opensearch.opensearch_domains[domain_arn].access_policy
|
||||
assert (
|
||||
opensearch.opensearch_domains[domain_arn].logging[0].name
|
||||
== "SEARCH_SLOW_LOGS"
|
||||
)
|
||||
assert opensearch.opensearch_domains[domain_arn].logging[0].enabled
|
||||
assert (
|
||||
opensearch.opensearch_domains[domain_arn].logging[1].name
|
||||
== "INDEX_SLOW_LOGS"
|
||||
)
|
||||
assert opensearch.opensearch_domains[domain_arn].logging[1].enabled
|
||||
assert opensearch.opensearch_domains[domain_arn].logging[2].name == "AUDIT_LOGS"
|
||||
assert opensearch.opensearch_domains[domain_arn].logging[2].enabled
|
||||
|
||||
# Test OpenSearchService describ domain
|
||||
@mock_aws
|
||||
def test_describe_domain(self):
|
||||
opensearch_client = client("opensearch", region_name=AWS_REGION_EU_WEST_1)
|
||||
domain = opensearch_client.create_domain(
|
||||
DomainName=test_domain_name,
|
||||
EncryptionAtRestOptions={"Enabled": True},
|
||||
ClusterConfig={
|
||||
"InstanceCount": 1,
|
||||
"ZoneAwarenessEnabled": True,
|
||||
"DedicatedMasterCount": 1,
|
||||
},
|
||||
NodeToNodeEncryptionOptions={"Enabled": True},
|
||||
AdvancedSecurityOptions={
|
||||
"Enabled": True,
|
||||
"InternalUserDatabaseEnabled": True,
|
||||
"SAMLOptions": {"Enabled": True},
|
||||
},
|
||||
CognitoOptions={"Enabled": True},
|
||||
VPCOptions={
|
||||
"SubnetIds": ["test-subnet-id"],
|
||||
"SecurityGroupIds": ["test-security-group-id"],
|
||||
},
|
||||
DomainEndpointOptions={"EnforceHTTPS": True},
|
||||
AccessPolicies=policy_json,
|
||||
EngineVersion="opensearch-version1",
|
||||
TagList=[
|
||||
{"Key": "test", "Value": "test"},
|
||||
],
|
||||
)
|
||||
aws_provider = set_mocked_aws_provider([])
|
||||
opensearch = OpenSearchService(aws_provider)
|
||||
domain_arn = domain["DomainStatus"]["ARN"]
|
||||
assert len(opensearch.opensearch_domains) == 1
|
||||
assert opensearch.opensearch_domains[0].name == test_domain_name
|
||||
assert opensearch.opensearch_domains[0].region == AWS_REGION_EU_WEST_1
|
||||
assert opensearch.opensearch_domains[0].arn == domain_arn
|
||||
assert opensearch.opensearch_domains[0].access_policy
|
||||
assert opensearch.opensearch_domains[0].vpc_endpoints == [
|
||||
"vpc-endpoint-h2dsd34efgyghrtguk5gt6j2foh4.us-east-1.es.amazonaws.com"
|
||||
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
|
||||
assert opensearch.opensearch_domains[domain_arn].region == AWS_REGION_EU_WEST_1
|
||||
assert opensearch.opensearch_domains[domain_arn].arn == domain_arn
|
||||
assert opensearch.opensearch_domains[domain_arn].access_policy
|
||||
assert opensearch.opensearch_domains[domain_arn].vpc_endpoints == [
|
||||
"test.eu-west-1.es.amazonaws.com"
|
||||
]
|
||||
assert opensearch.opensearch_domains[0].vpc_id == "test-vpc-id"
|
||||
assert opensearch.opensearch_domains[0].cognito_options
|
||||
assert opensearch.opensearch_domains[0].encryption_at_rest
|
||||
assert opensearch.opensearch_domains[0].node_to_node_encryption
|
||||
assert opensearch.opensearch_domains[0].enforce_https
|
||||
assert opensearch.opensearch_domains[0].internal_user_database
|
||||
assert opensearch.opensearch_domains[0].saml_enabled
|
||||
assert opensearch.opensearch_domains[0].update_available
|
||||
assert opensearch.opensearch_domains[0].version == "opensearch-version1"
|
||||
assert opensearch.opensearch_domains[0].tags == [
|
||||
assert not opensearch.opensearch_domains[domain_arn].vpc_id
|
||||
assert opensearch.opensearch_domains[domain_arn].cognito_options
|
||||
assert opensearch.opensearch_domains[domain_arn].encryption_at_rest
|
||||
assert opensearch.opensearch_domains[domain_arn].node_to_node_encryption
|
||||
assert opensearch.opensearch_domains[domain_arn].enforce_https
|
||||
assert opensearch.opensearch_domains[domain_arn].internal_user_database
|
||||
assert opensearch.opensearch_domains[domain_arn].saml_enabled
|
||||
assert not opensearch.opensearch_domains[domain_arn].update_available
|
||||
assert (
|
||||
opensearch.opensearch_domains[domain_arn].version == "opensearch-version1"
|
||||
)
|
||||
assert opensearch.opensearch_domains[domain_arn].instance_count == 1
|
||||
assert opensearch.opensearch_domains[domain_arn].zone_awareness_enabled
|
||||
assert opensearch.opensearch_domains[domain_arn].dedicated_master_count == 1
|
||||
assert opensearch.opensearch_domains[domain_arn].tags == [
|
||||
{"Key": "test", "Value": "test"},
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user