feat(opensearch): Add check logic with respective unit tests. Add metadata too

This commit is contained in:
MarioRgzLpz
2024-10-10 19:56:36 +02:00
parent 04993f4e76
commit 185d6c482d
4 changed files with 176 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "opensearch_service_domains_at_least_three_master_nodes",
"CheckTitle": "Elasticsearch domains should be configured with at least three dedicated master nodes",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "elasticsearch",
"SubServiceName": "domain",
"ResourceIdTemplate": "arn:aws:es:{region}:{account-id}:domain/{domain-name}",
"Severity": "medium",
"ResourceType": "AwsElasticsearchDomain",
"Description": "This control checks whether Elasticsearch domains are configured with at least three dedicated primary nodes. This control fails if the domain does not use dedicated primary nodes. Using more than three primary nodes may not provide significant additional availability benefits, while incurring extra costs.",
"Risk": "Without at least three dedicated master nodes, the Elasticsearch domain's fault-tolerance and ability to handle cluster management operations during node failures may be compromised, leading to potential data unavailability.",
"RelatedUrl": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/what-is.html",
"Remediation": {
"Code": {
"CLI": "aws opensearch update-domain-config --domain-name <domain-name> --cluster-config DedicatedMasterEnabled=true,MasterInstanceCount=3",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/es-controls.html#es-7",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure Elasticsearch domains with at least three dedicated master nodes for high availability and cluster fault tolerance.",
"Url": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/managedomains-configuration-changes.html"
}
},
"Categories": [
"redundancy"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,27 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.opensearch.opensearch_client import (
opensearch_client,
)
class opensearch_service_domains_at_least_three_master_nodes(Check):
def execute(self):
findings = []
for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(self.metadata())
report.region = domain.region
report.resource_id = domain.name
report.resource_arn = domain.arn
report.resource_tags = domain.tags
report.status = "FAIL"
report.status_extended = f"Opensearch domain {domain.name} has only {domain.dedicated_master_count} master nodes."
if domain.dedicated_master_count >= 3:
report.status = "PASS"
report.status_extended = f"Opensearch domain {domain.name} has {domain.dedicated_master_count} master nodes."
findings.append(report)
return findings

View File

@@ -0,0 +1,115 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_opensearch_service_domains_at_least_three_master_nodes:
@mock_aws
def test_no_domains(self):
client("opensearch", region_name=AWS_REGION_US_EAST_1)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
opensearch_service_domains_at_least_three_master_nodes,
)
check = opensearch_service_domains_at_least_three_master_nodes()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_less_than_three_master_nodes(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain-2-nodes",
ClusterConfig={"DedicatedMasterCount": 2},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
opensearch_service_domains_at_least_three_master_nodes,
)
check = opensearch_service_domains_at_least_three_master_nodes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Opensearch domain test-domain-2-nodes has only 2 master nodes."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)
@mock_aws
def test_at_least_three_master_nodes(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_EAST_1)
domain = opensearch_client.create_domain(
DomainName="test-domain-3-nodes",
ClusterConfig={"DedicatedMasterCount": 3},
)
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
mocked_aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mocked_aws_provider,
), mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes.opensearch_client",
new=OpenSearchService(mocked_aws_provider),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_at_least_three_master_nodes.opensearch_service_domains_at_least_three_master_nodes import (
opensearch_service_domains_at_least_three_master_nodes,
)
check = opensearch_service_domains_at_least_three_master_nodes()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Opensearch domain test-domain-3-nodes has 3 master nodes."
)
assert result[0].resource_id == domain["DomainStatus"]["DomainName"]
assert (
result[0].resource_arn
== f"arn:aws:es:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:domain/{domain["DomainStatus"]["DomainName"]}"
)