Compare commits

...

2 Commits
5.7.5 ... v5.1

Author SHA1 Message Date
Prowler Bot
67c2c9d53f fix(cloudsql): add trusted client certificates case for cloudsql_instance_ssl_connections (#6686)
Co-authored-by: Rubén De la Torre Vico <rubendltv22@gmail.com>
2025-01-24 12:19:24 -05:00
Prowler Bot
6bc68b785e fix(cloudwatch): NoneType object is not iterable (#6676)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2025-01-23 13:24:58 -05:00
3 changed files with 67 additions and 13 deletions

View File

@@ -12,18 +12,21 @@ class cloudwatch_log_group_not_publicly_accessible(Check):
and logs_client.log_groups is not None
):
for resource_policies in logs_client.resource_policies.values():
for resource_policy in resource_policies:
if is_policy_public(
resource_policy.policy, logs_client.audited_account
):
for statement in resource_policy.policy.get("Statement", []):
public_resources = statement.get("Resource", [])
if isinstance(public_resources, str):
public_resources = [public_resources]
for resource in public_resources:
for log_group in logs_client.log_groups.values():
if log_group.arn in resource or resource == "*":
public_log_groups.append(log_group.arn)
if resource_policies is not None:
for resource_policy in resource_policies:
if is_policy_public(
resource_policy.policy, logs_client.audited_account
):
for statement in resource_policy.policy.get(
"Statement", []
):
public_resources = statement.get("Resource", [])
if isinstance(public_resources, str):
public_resources = [public_resources]
for resource in public_resources:
for log_group in logs_client.log_groups.values():
if log_group.arn in resource or resource == "*":
public_log_groups.append(log_group.arn)
for log_group in logs_client.log_groups.values():
report = Check_Report_AWS(self.metadata())
report.region = log_group.region

View File

@@ -15,7 +15,10 @@ class cloudsql_instance_ssl_connections(Check):
report.status_extended = (
f"Database Instance {instance.name} requires SSL connections."
)
if not instance.require_ssl or instance.ssl_mode != "ENCRYPTED_ONLY":
if (
not instance.require_ssl
or instance.ssl_mode == "ALLOW_UNENCRYPTED_AND_ENCRYPTED"
):
report.status = "FAIL"
report.status_extended = f"Database Instance {instance.name} does not require SSL connections."
findings.append(report)

View File

@@ -167,3 +167,51 @@ class Test_cloudsql_instance_ssl_connections:
assert result[0].resource_name == "instance1"
assert result[0].location == GCP_EU1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_cloudsql_instance_ssl_connections_enabled_with_trusted_client_certificates(
self,
):
cloudsql_client = mock.MagicMock()
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_ssl_connections.cloudsql_instance_ssl_connections.cloudsql_client",
new=cloudsql_client,
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_ssl_connections.cloudsql_instance_ssl_connections import (
cloudsql_instance_ssl_connections,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="instance1",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=True,
ssl_mode="TRUSTED_CLIENT_CERTIFICATE_REQUIRED",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
)
]
check = cloudsql_instance_ssl_connections()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Database Instance instance1 requires SSL connections."
)
assert result[0].resource_id == "instance1"
assert result[0].resource_name == "instance1"
assert result[0].location == GCP_EU1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID