diff --git a/docs/user-guide/cli/tutorials/configuration_file.mdx b/docs/user-guide/cli/tutorials/configuration_file.mdx index e5039bff21..923bef7ce1 100644 --- a/docs/user-guide/cli/tutorials/configuration_file.mdx +++ b/docs/user-guide/cli/tutorials/configuration_file.mdx @@ -73,6 +73,7 @@ The following list includes all the AWS checks with configurable variables that | `ssm_documents_set_as_public` | `trusted_account_ids` | List of Strings | | `vpc_endpoint_connections_trust_boundaries` | `trusted_account_ids` | List of Strings | | `vpc_endpoint_services_allowed_principals_trust_boundaries` | `trusted_account_ids` | List of Strings | +| `opensearch_service_domains_not_publicly_accessible` | `trusted_ips` | List of Strings | ## Azure diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 90e0e86103..925062b178 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -8,6 +8,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - `entra_conditional_access_policy_approved_client_app_required_for_mobile` check for m365 provider [(#10216)](https://github.com/prowler-cloud/prowler/pull/10216) - `entra_conditional_access_policy_compliant_device_hybrid_joined_device_mfa_required` check for M365 provider [(#10197)](https://github.com/prowler-cloud/prowler/pull/10197) +- Add `trusted_ips` configurable option to `opensearch_service_domains_not_publicly_accessible` check to reduce false positives on IP-restricted policies [(#8631)](https://github.com/prowler-cloud/prowler/pull/8631) ### 🔄 Changed diff --git a/prowler/config/config.yaml b/prowler/config/config.yaml index 4682afc7af..dcff0a6169 100644 --- a/prowler/config/config.yaml +++ b/prowler/config/config.yaml @@ -72,6 +72,11 @@ aws: # trusted_account_ids : ["123456789012", "098765432109", "678901234567"] trusted_account_ids: [] + # AWS OpenSearch Configuration (opensearch_service_domains_not_publicly_accessible) + # Trusted IP addresses or CIDR ranges that should not be considered as public access, e.g. + # trusted_ips: ["1.2.3.4", "10.0.0.0/8"] + trusted_ips: [] + # AWS Cloudwatch Configuration # aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days log_group_retention_days: 365 diff --git a/prowler/providers/aws/services/iam/lib/policy.py b/prowler/providers/aws/services/iam/lib/policy.py index 8442fefa66..d8806f280b 100644 --- a/prowler/providers/aws/services/iam/lib/policy.py +++ b/prowler/providers/aws/services/iam/lib/policy.py @@ -380,6 +380,56 @@ def is_condition_restricting_from_private_ip(condition_statement: dict) -> bool: return is_from_private_ip +def is_condition_restricting_to_trusted_ips( + condition_statement: dict, trusted_ips: list = None +) -> bool: + """Check if the policy condition restricts access to trusted IP addresses. + + Keyword arguments: + condition_statement -- The policy condition to check. For example: + { + "IpAddress": { + "aws:SourceIp": "X.X.X.X" + } + } + trusted_ips -- A list of trusted IP addresses or CIDR ranges. + """ + if not trusted_ips: + return False + + try: + CONDITION_OPERATOR = "IpAddress" + CONDITION_KEY = "aws:sourceip" + + if condition_statement.get(CONDITION_OPERATOR, {}): + condition_statement[CONDITION_OPERATOR] = { + k.lower(): v for k, v in condition_statement[CONDITION_OPERATOR].items() + } + + if condition_statement[CONDITION_OPERATOR].get(CONDITION_KEY, ""): + if not isinstance( + condition_statement[CONDITION_OPERATOR][CONDITION_KEY], list + ): + condition_statement[CONDITION_OPERATOR][CONDITION_KEY] = [ + condition_statement[CONDITION_OPERATOR][CONDITION_KEY] + ] + + trusted_ips_set = {ip.lower() for ip in trusted_ips} + for ip in condition_statement[CONDITION_OPERATOR][CONDITION_KEY]: + if ip == "*" or ip == "0.0.0.0/0": + return False + if ip not in trusted_ips_set: + return False + return True + + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + return False + + # TODO: Add logic for deny statements def is_policy_public( policy: dict, @@ -388,6 +438,7 @@ def is_policy_public( not_allowed_actions: list = [], check_cross_service_confused_deputy=False, trusted_account_ids: list = None, + trusted_ips: list = None, ) -> bool: """ Check if the policy allows public access to the resource. @@ -399,6 +450,7 @@ def is_policy_public( not_allowed_actions (list): List of actions that are not allowed, default: []. If not_allowed_actions is empty, the function will not consider the actions in the policy. check_cross_service_confused_deputy (bool): If the policy is checked for cross-service confused deputy, default: False trusted_account_ids (list): A list of trusted accound ids to reduce false positives on cross-account checks + trusted_ips (list): A list of trusted IP addresses or CIDR ranges to reduce false positives on IP-based checks Returns: bool: True if the policy allows public access, False otherwise """ @@ -511,6 +563,10 @@ def is_policy_public( and not is_condition_restricting_from_private_ip( statement.get("Condition", {}) ) + and not is_condition_restricting_to_trusted_ips( + statement.get("Condition", {}), + trusted_ips, + ) ) if is_public: break diff --git a/prowler/providers/aws/services/opensearch/opensearch_service_domains_not_publicly_accessible/opensearch_service_domains_not_publicly_accessible.py b/prowler/providers/aws/services/opensearch/opensearch_service_domains_not_publicly_accessible/opensearch_service_domains_not_publicly_accessible.py index 19a890620b..d02ed12a5a 100644 --- a/prowler/providers/aws/services/opensearch/opensearch_service_domains_not_publicly_accessible/opensearch_service_domains_not_publicly_accessible.py +++ b/prowler/providers/aws/services/opensearch/opensearch_service_domains_not_publicly_accessible/opensearch_service_domains_not_publicly_accessible.py @@ -8,6 +8,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import ( class opensearch_service_domains_not_publicly_accessible(Check): def execute(self): findings = [] + trusted_ips = opensearch_client.audit_config.get("trusted_ips", []) for domain in opensearch_client.opensearch_domains.values(): report = Check_Report_AWS(metadata=self.metadata(), resource=domain) report.status = "PASS" @@ -18,7 +19,9 @@ class opensearch_service_domains_not_publicly_accessible(Check): if domain.vpc_id: report.status_extended = f"Opensearch domain {domain.name} is in a VPC, then it is not publicly accessible." elif domain.access_policy is not None and is_policy_public( - domain.access_policy, opensearch_client.audited_account + domain.access_policy, + opensearch_client.audited_account, + trusted_ips=trusted_ips, ): report.status = "FAIL" report.status_extended = f"Opensearch domain {domain.name} is publicly accessible via access policy." diff --git a/tests/config/config_test.py b/tests/config/config_test.py index f67f3fda35..08b1f7d5e0 100644 --- a/tests/config/config_test.py +++ b/tests/config/config_test.py @@ -31,6 +31,7 @@ old_config_aws = { "ec2_allowed_interface_types": ["api_gateway_managed", "vpc_endpoint"], "ec2_allowed_instance_owners": ["amazon-elb"], "trusted_account_ids": [], + "trusted_ips": [], "log_group_retention_days": 365, "max_idle_disconnect_timeout_in_seconds": 600, "max_disconnect_timeout_in_seconds": 300, @@ -95,6 +96,7 @@ config_aws = { "fargate_linux_latest_version": "1.4.0", "fargate_windows_latest_version": "1.0.0", "trusted_account_ids": [], + "trusted_ips": [], "log_group_retention_days": 365, "max_idle_disconnect_timeout_in_seconds": 600, "max_disconnect_timeout_in_seconds": 300, diff --git a/tests/config/fixtures/config.yaml b/tests/config/fixtures/config.yaml index 44e826d382..1b63e2387f 100644 --- a/tests/config/fixtures/config.yaml +++ b/tests/config/fixtures/config.yaml @@ -72,6 +72,11 @@ aws: # trusted_account_ids : ["123456789012", "098765432109", "678901234567"] trusted_account_ids: [] + # AWS OpenSearch Configuration (opensearch_service_domains_not_publicly_accessible) + # Trusted IP addresses or CIDR ranges that should not be considered as public access, e.g. + # trusted_ips: ["1.2.3.4", "10.0.0.0/8"] + trusted_ips: [] + # AWS Cloudwatch Configuration # aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days log_group_retention_days: 365 diff --git a/tests/config/fixtures/config_old.yaml b/tests/config/fixtures/config_old.yaml index cbd3bf4fa0..33220e1246 100644 --- a/tests/config/fixtures/config_old.yaml +++ b/tests/config/fixtures/config_old.yaml @@ -25,6 +25,11 @@ ec2_allowed_instance_owners: # trusted_account_ids : ["123456789012", "098765432109", "678901234567"] trusted_account_ids: [] +# AWS OpenSearch Configuration (opensearch_service_domains_not_publicly_accessible) +# Trusted IP addresses or CIDR ranges that should not be considered as public access, e.g. +# trusted_ips: ["1.2.3.4", "10.0.0.0/8"] +trusted_ips: [] + # AWS Cloudwatch Configuration # aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days log_group_retention_days: 365 diff --git a/tests/providers/aws/services/iam/lib/policy_test.py b/tests/providers/aws/services/iam/lib/policy_test.py index fe25b4ca0f..afea8ca658 100644 --- a/tests/providers/aws/services/iam/lib/policy_test.py +++ b/tests/providers/aws/services/iam/lib/policy_test.py @@ -13,6 +13,7 @@ from prowler.providers.aws.services.iam.lib.policy import ( is_condition_block_restrictive_organization, is_condition_block_restrictive_sns_endpoint, is_condition_restricting_from_private_ip, + is_condition_restricting_to_trusted_ips, is_policy_public, ) @@ -1982,6 +1983,49 @@ class Test_Policy: } assert not is_condition_restricting_from_private_ip(condition_from_invalid_ip) + def test_is_condition_restricting_to_trusted_ips_no_trusted_ips(self): + condition = {"IpAddress": {"aws:SourceIp": "1.2.3.4"}} + assert not is_condition_restricting_to_trusted_ips(condition) + + def test_is_condition_restricting_to_trusted_ips_empty_trusted_ips(self): + condition = {"IpAddress": {"aws:SourceIp": "1.2.3.4"}} + assert not is_condition_restricting_to_trusted_ips(condition, []) + + def test_is_condition_restricting_to_trusted_ips_matching(self): + condition = {"IpAddress": {"aws:SourceIp": "1.2.3.4"}} + assert is_condition_restricting_to_trusted_ips(condition, ["1.2.3.4"]) + + def test_is_condition_restricting_to_trusted_ips_not_matching(self): + condition = {"IpAddress": {"aws:SourceIp": "5.6.7.8"}} + assert not is_condition_restricting_to_trusted_ips(condition, ["1.2.3.4"]) + + def test_is_condition_restricting_to_trusted_ips_wildcard(self): + condition = {"IpAddress": {"aws:SourceIp": "*"}} + assert not is_condition_restricting_to_trusted_ips(condition, ["1.2.3.4"]) + + def test_is_condition_restricting_to_trusted_ips_open_cidr(self): + condition = {"IpAddress": {"aws:SourceIp": "0.0.0.0/0"}} + assert not is_condition_restricting_to_trusted_ips(condition, ["1.2.3.4"]) + + def test_is_condition_restricting_to_trusted_ips_multiple_ips_all_trusted(self): + condition = {"IpAddress": {"aws:SourceIp": ["1.2.3.4", "5.6.7.8"]}} + assert is_condition_restricting_to_trusted_ips( + condition, ["1.2.3.4", "5.6.7.8"] + ) + + def test_is_condition_restricting_to_trusted_ips_multiple_ips_partial_trusted(self): + condition = {"IpAddress": {"aws:SourceIp": ["1.2.3.4", "9.9.9.9"]}} + assert not is_condition_restricting_to_trusted_ips( + condition, ["1.2.3.4", "5.6.7.8"] + ) + + def test_is_condition_restricting_to_trusted_ips_cidr_range(self): + condition = {"IpAddress": {"aws:SourceIp": "10.0.0.0/8"}} + assert is_condition_restricting_to_trusted_ips(condition, ["10.0.0.0/8"]) + + def test_is_condition_restricting_to_trusted_ips_no_condition(self): + assert not is_condition_restricting_to_trusted_ips({}, ["1.2.3.4"]) + def test_is_policy_public_(self): policy = { "Statement": [ @@ -2271,6 +2315,48 @@ class Test_Policy: } assert is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER) + def test_is_policy_public_with_trusted_ips(self): + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": ["*"], + "Condition": { + "IpAddress": {"aws:SourceIp": ["1.2.3.4", "5.6.7.8"]} + }, + "Resource": "*", + } + ], + } + assert not is_policy_public( + policy, + TRUSTED_AWS_ACCOUNT_NUMBER, + trusted_ips=["1.2.3.4", "5.6.7.8"], + ) + + def test_is_policy_public_with_trusted_ips_partial_match(self): + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": ["*"], + "Condition": { + "IpAddress": {"aws:SourceIp": ["1.2.3.4", "9.9.9.9"]} + }, + "Resource": "*", + } + ], + } + assert is_policy_public( + policy, + TRUSTED_AWS_ACCOUNT_NUMBER, + trusted_ips=["1.2.3.4", "5.6.7.8"], + ) + def test_check_admin_access(self): policy = { "Version": "2012-10-17", diff --git a/tests/providers/aws/services/opensearch/opensearch_service_domains_not_publicly_accessible/opensearch_service_domains_not_publicly_accessible_test.py b/tests/providers/aws/services/opensearch/opensearch_service_domains_not_publicly_accessible/opensearch_service_domains_not_publicly_accessible_test.py index cdbb87c1a8..8003e9a1b7 100644 --- a/tests/providers/aws/services/opensearch/opensearch_service_domains_not_publicly_accessible/opensearch_service_domains_not_publicly_accessible_test.py +++ b/tests/providers/aws/services/opensearch/opensearch_service_domains_not_publicly_accessible/opensearch_service_domains_not_publicly_accessible_test.py @@ -74,6 +74,19 @@ policy_data_source_whole_internet = { ], } +policy_data_trusted_ip = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "*"}, + "Action": ["es:ESHttp*"], + "Condition": {"IpAddress": {"aws:SourceIp": ["1.2.3.4", "5.6.7.8"]}}, + "Resource": f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}/*", + } + ], +} + class Test_opensearch_service_domains_not_publicly_accessible: @mock_aws @@ -304,3 +317,87 @@ class Test_opensearch_service_domains_not_publicly_accessible: assert result[0].resource_arn == domain_arn assert result[0].region == AWS_REGION_US_WEST_2 assert result[0].resource_tags == [] + + @mock_aws + def test_policy_data_not_restricted_with_trusted_ips(self): + opensearch_client = client("opensearch", region_name=AWS_REGION_US_WEST_2) + domain_arn = opensearch_client.create_domain( + DomainName=domain_name, + AccessPolicies=dumps(policy_data_trusted_ip), + )["DomainStatus"]["ARN"] + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2]) + aws_provider._audit_config = {"trusted_ips": ["1.2.3.4", "5.6.7.8"]} + + from prowler.providers.aws.services.opensearch.opensearch_service import ( + OpenSearchService, + ) + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + mock.patch( + "prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible.opensearch_client", + new=OpenSearchService(aws_provider), + ), + ): + from prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible import ( + opensearch_service_domains_not_publicly_accessible, + ) + + check = opensearch_service_domains_not_publicly_accessible() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Opensearch domain {domain_name} is not publicly accessible." + ) + assert result[0].resource_id == domain_name + assert result[0].resource_arn == domain_arn + assert result[0].region == AWS_REGION_US_WEST_2 + assert result[0].resource_tags == [] + + @mock_aws + def test_policy_data_not_restricted_with_trusted_ips_partial_match(self): + opensearch_client = client("opensearch", region_name=AWS_REGION_US_WEST_2) + domain_arn = opensearch_client.create_domain( + DomainName=domain_name, + AccessPolicies=dumps(policy_data_trusted_ip), + )["DomainStatus"]["ARN"] + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2]) + aws_provider._audit_config = {"trusted_ips": ["1.2.3.4"]} + + from prowler.providers.aws.services.opensearch.opensearch_service import ( + OpenSearchService, + ) + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + mock.patch( + "prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible.opensearch_client", + new=OpenSearchService(aws_provider), + ), + ): + from prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible import ( + opensearch_service_domains_not_publicly_accessible, + ) + + check = opensearch_service_domains_not_publicly_accessible() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Opensearch domain {domain_name} is publicly accessible via access policy." + ) + assert result[0].resource_id == domain_name + assert result[0].resource_arn == domain_arn + assert result[0].region == AWS_REGION_US_WEST_2 + assert result[0].resource_tags == []