feat(iam): Add trusted IP configurable option to reduce false positives in 'opensearch' check (#8631)

Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
This commit is contained in:
Eran Cohen
2026-03-10 13:12:54 +02:00
committed by GitHub
parent d3213e9f1e
commit 0b461233c1
10 changed files with 262 additions and 1 deletions

View File

@@ -73,6 +73,7 @@ The following list includes all the AWS checks with configurable variables that
| `ssm_documents_set_as_public` | `trusted_account_ids` | List of Strings | | `ssm_documents_set_as_public` | `trusted_account_ids` | List of Strings |
| `vpc_endpoint_connections_trust_boundaries` | `trusted_account_ids` | List of Strings | | `vpc_endpoint_connections_trust_boundaries` | `trusted_account_ids` | List of Strings |
| `vpc_endpoint_services_allowed_principals_trust_boundaries` | `trusted_account_ids` | List of Strings | | `vpc_endpoint_services_allowed_principals_trust_boundaries` | `trusted_account_ids` | List of Strings |
| `opensearch_service_domains_not_publicly_accessible` | `trusted_ips` | List of Strings |
## Azure ## Azure

View File

@@ -8,6 +8,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `entra_conditional_access_policy_approved_client_app_required_for_mobile` check for m365 provider [(#10216)](https://github.com/prowler-cloud/prowler/pull/10216) - `entra_conditional_access_policy_approved_client_app_required_for_mobile` check for m365 provider [(#10216)](https://github.com/prowler-cloud/prowler/pull/10216)
- `entra_conditional_access_policy_compliant_device_hybrid_joined_device_mfa_required` check for M365 provider [(#10197)](https://github.com/prowler-cloud/prowler/pull/10197) - `entra_conditional_access_policy_compliant_device_hybrid_joined_device_mfa_required` check for M365 provider [(#10197)](https://github.com/prowler-cloud/prowler/pull/10197)
- Add `trusted_ips` configurable option to `opensearch_service_domains_not_publicly_accessible` check to reduce false positives on IP-restricted policies [(#8631)](https://github.com/prowler-cloud/prowler/pull/8631)
### 🔄 Changed ### 🔄 Changed

View File

@@ -72,6 +72,11 @@ aws:
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"] # trusted_account_ids : ["123456789012", "098765432109", "678901234567"]
trusted_account_ids: [] trusted_account_ids: []
# AWS OpenSearch Configuration (opensearch_service_domains_not_publicly_accessible)
# Trusted IP addresses or CIDR ranges that should not be considered as public access, e.g.
# trusted_ips: ["1.2.3.4", "10.0.0.0/8"]
trusted_ips: []
# AWS Cloudwatch Configuration # AWS Cloudwatch Configuration
# aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days # aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days
log_group_retention_days: 365 log_group_retention_days: 365

View File

@@ -380,6 +380,56 @@ def is_condition_restricting_from_private_ip(condition_statement: dict) -> bool:
return is_from_private_ip return is_from_private_ip
def is_condition_restricting_to_trusted_ips(
condition_statement: dict, trusted_ips: list = None
) -> bool:
"""Check if the policy condition restricts access to trusted IP addresses.
Keyword arguments:
condition_statement -- The policy condition to check. For example:
{
"IpAddress": {
"aws:SourceIp": "X.X.X.X"
}
}
trusted_ips -- A list of trusted IP addresses or CIDR ranges.
"""
if not trusted_ips:
return False
try:
CONDITION_OPERATOR = "IpAddress"
CONDITION_KEY = "aws:sourceip"
if condition_statement.get(CONDITION_OPERATOR, {}):
condition_statement[CONDITION_OPERATOR] = {
k.lower(): v for k, v in condition_statement[CONDITION_OPERATOR].items()
}
if condition_statement[CONDITION_OPERATOR].get(CONDITION_KEY, ""):
if not isinstance(
condition_statement[CONDITION_OPERATOR][CONDITION_KEY], list
):
condition_statement[CONDITION_OPERATOR][CONDITION_KEY] = [
condition_statement[CONDITION_OPERATOR][CONDITION_KEY]
]
trusted_ips_set = {ip.lower() for ip in trusted_ips}
for ip in condition_statement[CONDITION_OPERATOR][CONDITION_KEY]:
if ip == "*" or ip == "0.0.0.0/0":
return False
if ip not in trusted_ips_set:
return False
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
# TODO: Add logic for deny statements # TODO: Add logic for deny statements
def is_policy_public( def is_policy_public(
policy: dict, policy: dict,
@@ -388,6 +438,7 @@ def is_policy_public(
not_allowed_actions: list = [], not_allowed_actions: list = [],
check_cross_service_confused_deputy=False, check_cross_service_confused_deputy=False,
trusted_account_ids: list = None, trusted_account_ids: list = None,
trusted_ips: list = None,
) -> bool: ) -> bool:
""" """
Check if the policy allows public access to the resource. Check if the policy allows public access to the resource.
@@ -399,6 +450,7 @@ def is_policy_public(
not_allowed_actions (list): List of actions that are not allowed, default: []. If not_allowed_actions is empty, the function will not consider the actions in the policy. not_allowed_actions (list): List of actions that are not allowed, default: []. If not_allowed_actions is empty, the function will not consider the actions in the policy.
check_cross_service_confused_deputy (bool): If the policy is checked for cross-service confused deputy, default: False check_cross_service_confused_deputy (bool): If the policy is checked for cross-service confused deputy, default: False
trusted_account_ids (list): A list of trusted accound ids to reduce false positives on cross-account checks trusted_account_ids (list): A list of trusted accound ids to reduce false positives on cross-account checks
trusted_ips (list): A list of trusted IP addresses or CIDR ranges to reduce false positives on IP-based checks
Returns: Returns:
bool: True if the policy allows public access, False otherwise bool: True if the policy allows public access, False otherwise
""" """
@@ -511,6 +563,10 @@ def is_policy_public(
and not is_condition_restricting_from_private_ip( and not is_condition_restricting_from_private_ip(
statement.get("Condition", {}) statement.get("Condition", {})
) )
and not is_condition_restricting_to_trusted_ips(
statement.get("Condition", {}),
trusted_ips,
)
) )
if is_public: if is_public:
break break

View File

@@ -8,6 +8,7 @@ from prowler.providers.aws.services.opensearch.opensearch_client import (
class opensearch_service_domains_not_publicly_accessible(Check): class opensearch_service_domains_not_publicly_accessible(Check):
def execute(self): def execute(self):
findings = [] findings = []
trusted_ips = opensearch_client.audit_config.get("trusted_ips", [])
for domain in opensearch_client.opensearch_domains.values(): for domain in opensearch_client.opensearch_domains.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=domain) report = Check_Report_AWS(metadata=self.metadata(), resource=domain)
report.status = "PASS" report.status = "PASS"
@@ -18,7 +19,9 @@ class opensearch_service_domains_not_publicly_accessible(Check):
if domain.vpc_id: if domain.vpc_id:
report.status_extended = f"Opensearch domain {domain.name} is in a VPC, then it is not publicly accessible." report.status_extended = f"Opensearch domain {domain.name} is in a VPC, then it is not publicly accessible."
elif domain.access_policy is not None and is_policy_public( elif domain.access_policy is not None and is_policy_public(
domain.access_policy, opensearch_client.audited_account domain.access_policy,
opensearch_client.audited_account,
trusted_ips=trusted_ips,
): ):
report.status = "FAIL" report.status = "FAIL"
report.status_extended = f"Opensearch domain {domain.name} is publicly accessible via access policy." report.status_extended = f"Opensearch domain {domain.name} is publicly accessible via access policy."

View File

@@ -31,6 +31,7 @@ old_config_aws = {
"ec2_allowed_interface_types": ["api_gateway_managed", "vpc_endpoint"], "ec2_allowed_interface_types": ["api_gateway_managed", "vpc_endpoint"],
"ec2_allowed_instance_owners": ["amazon-elb"], "ec2_allowed_instance_owners": ["amazon-elb"],
"trusted_account_ids": [], "trusted_account_ids": [],
"trusted_ips": [],
"log_group_retention_days": 365, "log_group_retention_days": 365,
"max_idle_disconnect_timeout_in_seconds": 600, "max_idle_disconnect_timeout_in_seconds": 600,
"max_disconnect_timeout_in_seconds": 300, "max_disconnect_timeout_in_seconds": 300,
@@ -95,6 +96,7 @@ config_aws = {
"fargate_linux_latest_version": "1.4.0", "fargate_linux_latest_version": "1.4.0",
"fargate_windows_latest_version": "1.0.0", "fargate_windows_latest_version": "1.0.0",
"trusted_account_ids": [], "trusted_account_ids": [],
"trusted_ips": [],
"log_group_retention_days": 365, "log_group_retention_days": 365,
"max_idle_disconnect_timeout_in_seconds": 600, "max_idle_disconnect_timeout_in_seconds": 600,
"max_disconnect_timeout_in_seconds": 300, "max_disconnect_timeout_in_seconds": 300,

View File

@@ -72,6 +72,11 @@ aws:
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"] # trusted_account_ids : ["123456789012", "098765432109", "678901234567"]
trusted_account_ids: [] trusted_account_ids: []
# AWS OpenSearch Configuration (opensearch_service_domains_not_publicly_accessible)
# Trusted IP addresses or CIDR ranges that should not be considered as public access, e.g.
# trusted_ips: ["1.2.3.4", "10.0.0.0/8"]
trusted_ips: []
# AWS Cloudwatch Configuration # AWS Cloudwatch Configuration
# aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days # aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days
log_group_retention_days: 365 log_group_retention_days: 365

View File

@@ -25,6 +25,11 @@ ec2_allowed_instance_owners:
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"] # trusted_account_ids : ["123456789012", "098765432109", "678901234567"]
trusted_account_ids: [] trusted_account_ids: []
# AWS OpenSearch Configuration (opensearch_service_domains_not_publicly_accessible)
# Trusted IP addresses or CIDR ranges that should not be considered as public access, e.g.
# trusted_ips: ["1.2.3.4", "10.0.0.0/8"]
trusted_ips: []
# AWS Cloudwatch Configuration # AWS Cloudwatch Configuration
# aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days # aws.cloudwatch_log_group_retention_policy_specific_days_enabled --> by default is 365 days
log_group_retention_days: 365 log_group_retention_days: 365

View File

@@ -13,6 +13,7 @@ from prowler.providers.aws.services.iam.lib.policy import (
is_condition_block_restrictive_organization, is_condition_block_restrictive_organization,
is_condition_block_restrictive_sns_endpoint, is_condition_block_restrictive_sns_endpoint,
is_condition_restricting_from_private_ip, is_condition_restricting_from_private_ip,
is_condition_restricting_to_trusted_ips,
is_policy_public, is_policy_public,
) )
@@ -1982,6 +1983,49 @@ class Test_Policy:
} }
assert not is_condition_restricting_from_private_ip(condition_from_invalid_ip) assert not is_condition_restricting_from_private_ip(condition_from_invalid_ip)
def test_is_condition_restricting_to_trusted_ips_no_trusted_ips(self):
condition = {"IpAddress": {"aws:SourceIp": "1.2.3.4"}}
assert not is_condition_restricting_to_trusted_ips(condition)
def test_is_condition_restricting_to_trusted_ips_empty_trusted_ips(self):
condition = {"IpAddress": {"aws:SourceIp": "1.2.3.4"}}
assert not is_condition_restricting_to_trusted_ips(condition, [])
def test_is_condition_restricting_to_trusted_ips_matching(self):
condition = {"IpAddress": {"aws:SourceIp": "1.2.3.4"}}
assert is_condition_restricting_to_trusted_ips(condition, ["1.2.3.4"])
def test_is_condition_restricting_to_trusted_ips_not_matching(self):
condition = {"IpAddress": {"aws:SourceIp": "5.6.7.8"}}
assert not is_condition_restricting_to_trusted_ips(condition, ["1.2.3.4"])
def test_is_condition_restricting_to_trusted_ips_wildcard(self):
condition = {"IpAddress": {"aws:SourceIp": "*"}}
assert not is_condition_restricting_to_trusted_ips(condition, ["1.2.3.4"])
def test_is_condition_restricting_to_trusted_ips_open_cidr(self):
condition = {"IpAddress": {"aws:SourceIp": "0.0.0.0/0"}}
assert not is_condition_restricting_to_trusted_ips(condition, ["1.2.3.4"])
def test_is_condition_restricting_to_trusted_ips_multiple_ips_all_trusted(self):
condition = {"IpAddress": {"aws:SourceIp": ["1.2.3.4", "5.6.7.8"]}}
assert is_condition_restricting_to_trusted_ips(
condition, ["1.2.3.4", "5.6.7.8"]
)
def test_is_condition_restricting_to_trusted_ips_multiple_ips_partial_trusted(self):
condition = {"IpAddress": {"aws:SourceIp": ["1.2.3.4", "9.9.9.9"]}}
assert not is_condition_restricting_to_trusted_ips(
condition, ["1.2.3.4", "5.6.7.8"]
)
def test_is_condition_restricting_to_trusted_ips_cidr_range(self):
condition = {"IpAddress": {"aws:SourceIp": "10.0.0.0/8"}}
assert is_condition_restricting_to_trusted_ips(condition, ["10.0.0.0/8"])
def test_is_condition_restricting_to_trusted_ips_no_condition(self):
assert not is_condition_restricting_to_trusted_ips({}, ["1.2.3.4"])
def test_is_policy_public_(self): def test_is_policy_public_(self):
policy = { policy = {
"Statement": [ "Statement": [
@@ -2271,6 +2315,48 @@ class Test_Policy:
} }
assert is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER) assert is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
def test_is_policy_public_with_trusted_ips(self):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["*"],
"Condition": {
"IpAddress": {"aws:SourceIp": ["1.2.3.4", "5.6.7.8"]}
},
"Resource": "*",
}
],
}
assert not is_policy_public(
policy,
TRUSTED_AWS_ACCOUNT_NUMBER,
trusted_ips=["1.2.3.4", "5.6.7.8"],
)
def test_is_policy_public_with_trusted_ips_partial_match(self):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["*"],
"Condition": {
"IpAddress": {"aws:SourceIp": ["1.2.3.4", "9.9.9.9"]}
},
"Resource": "*",
}
],
}
assert is_policy_public(
policy,
TRUSTED_AWS_ACCOUNT_NUMBER,
trusted_ips=["1.2.3.4", "5.6.7.8"],
)
def test_check_admin_access(self): def test_check_admin_access(self):
policy = { policy = {
"Version": "2012-10-17", "Version": "2012-10-17",

View File

@@ -74,6 +74,19 @@ policy_data_source_whole_internet = {
], ],
} }
policy_data_trusted_ip = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["es:ESHttp*"],
"Condition": {"IpAddress": {"aws:SourceIp": ["1.2.3.4", "5.6.7.8"]}},
"Resource": f"arn:aws:es:us-west-2:{AWS_ACCOUNT_NUMBER}:domain/{domain_name}/*",
}
],
}
class Test_opensearch_service_domains_not_publicly_accessible: class Test_opensearch_service_domains_not_publicly_accessible:
@mock_aws @mock_aws
@@ -304,3 +317,87 @@ class Test_opensearch_service_domains_not_publicly_accessible:
assert result[0].resource_arn == domain_arn assert result[0].resource_arn == domain_arn
assert result[0].region == AWS_REGION_US_WEST_2 assert result[0].region == AWS_REGION_US_WEST_2
assert result[0].resource_tags == [] assert result[0].resource_tags == []
@mock_aws
def test_policy_data_not_restricted_with_trusted_ips(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_WEST_2)
domain_arn = opensearch_client.create_domain(
DomainName=domain_name,
AccessPolicies=dumps(policy_data_trusted_ip),
)["DomainStatus"]["ARN"]
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
aws_provider._audit_config = {"trusted_ips": ["1.2.3.4", "5.6.7.8"]}
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible.opensearch_client",
new=OpenSearchService(aws_provider),
),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible import (
opensearch_service_domains_not_publicly_accessible,
)
check = opensearch_service_domains_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Opensearch domain {domain_name} is not publicly accessible."
)
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
assert result[0].region == AWS_REGION_US_WEST_2
assert result[0].resource_tags == []
@mock_aws
def test_policy_data_not_restricted_with_trusted_ips_partial_match(self):
opensearch_client = client("opensearch", region_name=AWS_REGION_US_WEST_2)
domain_arn = opensearch_client.create_domain(
DomainName=domain_name,
AccessPolicies=dumps(policy_data_trusted_ip),
)["DomainStatus"]["ARN"]
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
aws_provider._audit_config = {"trusted_ips": ["1.2.3.4"]}
from prowler.providers.aws.services.opensearch.opensearch_service import (
OpenSearchService,
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible.opensearch_client",
new=OpenSearchService(aws_provider),
),
):
from prowler.providers.aws.services.opensearch.opensearch_service_domains_not_publicly_accessible.opensearch_service_domains_not_publicly_accessible import (
opensearch_service_domains_not_publicly_accessible,
)
check = opensearch_service_domains_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Opensearch domain {domain_name} is publicly accessible via access policy."
)
assert result[0].resource_id == domain_name
assert result[0].resource_arn == domain_arn
assert result[0].region == AWS_REGION_US_WEST_2
assert result[0].resource_tags == []