fix(gcp): keyerrors in services cloudsql and monitoring (#8909)

This commit is contained in:
Daniel Barranquero
2025-10-14 15:30:00 +02:00
committed by GitHub
parent 8c3e1b96f9
commit 272e4547b2
5 changed files with 399 additions and 5 deletions

View File

@@ -40,6 +40,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Fix SNS topics showing empty AWS_ResourceID in Quick Inventory output [(#8762)](https://github.com/prowler-cloud/prowler/issues/8762)
- Fix HTML Markdown output for long strings [(#8803)](https://github.com/prowler-cloud/prowler/pull/8803)
- Prowler ThreatScore scoring calculation CLI [(#8582)](https://github.com/prowler-cloud/prowler/pull/8582)
- Fix KeyError in CloudSQL and Monitoring services in GCP provider [(#8909)](https://github.com/prowler-cloud/prowler/pull/8909)
---

View File

@@ -37,9 +37,9 @@ class CloudSQL(GCPService):
ssl_mode=instance["settings"]
.get("ipConfiguration", {})
.get("sslMode", "ALLOW_UNENCRYPTED_AND_ENCRYPTED"),
automated_backups=instance["settings"][
"backupConfiguration"
]["enabled"],
automated_backups=instance["settings"]
.get("backupConfiguration", {})
.get("enabled", False),
authorized_networks=instance["settings"]
.get("ipConfiguration", {})
.get("authorizedNetworks", []),

View File

@@ -33,8 +33,36 @@ class Monitoring(GCPService):
for policy in response.get("alertPolicies", []):
filters = []
for condition in policy["conditions"]:
filters.append(condition["conditionThreshold"]["filter"])
for condition in policy.get("conditions", []):
# Handle different condition types
if "conditionThreshold" in condition:
filter_value = condition["conditionThreshold"].get(
"filter", ""
)
if filter_value:
filters.append(filter_value)
elif "conditionAbsent" in condition:
filter_value = condition["conditionAbsent"].get(
"filter", ""
)
if filter_value:
filters.append(filter_value)
elif "conditionMatchedLog" in condition:
filter_value = condition["conditionMatchedLog"].get(
"filter", ""
)
if filter_value:
filters.append(filter_value)
elif "conditionMonitoringQueryLanguage" in condition:
query = condition[
"conditionMonitoringQueryLanguage"
].get("query", "")
if query:
filters.append(query)
else:
logger.warning(
f"Unknown condition type in alert policy {policy.get('name', 'Unknown')}: {condition.keys()}"
)
self.alert_policies.append(
AlertPolicy(
name=policy["name"],

View File

@@ -63,3 +63,154 @@ class TestCloudSQLService:
]
assert cloudsql_client.instances[1].flags == []
assert cloudsql_client.instances[1].project_id == GCP_PROJECT_ID
def test_instances_without_backup_configuration(self):
"""Test that CloudSQL service handles instances without backupConfiguration field"""
def mock_api_client_without_backup_config(*args, **kwargs):
from unittest.mock import MagicMock
client = MagicMock()
client.instances().list().execute.return_value = {
"items": [
{
"name": "instance_no_backup_config",
"databaseVersion": "MYSQL_8_0",
"region": "us-east1",
"ipAddresses": [{"type": "PRIVATE", "ipAddress": "10.0.0.1"}],
"settings": {
"ipConfiguration": {
"requireSsl": True,
"sslMode": "ENCRYPTED_ONLY",
"authorizedNetworks": [],
},
"databaseFlags": [],
# backupConfiguration is missing
},
}
]
}
client.instances().list_next.return_value = None
return client
with (
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__",
new=mock_is_api_active,
),
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__generate_client__",
new=mock_api_client_without_backup_config,
),
):
cloudsql_client = CloudSQL(
set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID])
)
# Should handle gracefully with default value
assert len(cloudsql_client.instances) == 1
assert cloudsql_client.instances[0].name == "instance_no_backup_config"
assert (
cloudsql_client.instances[0].automated_backups is False
) # Default value
def test_instances_with_empty_backup_configuration(self):
"""Test that CloudSQL service handles instances with empty backupConfiguration"""
def mock_api_client_with_empty_backup_config(*args, **kwargs):
from unittest.mock import MagicMock
client = MagicMock()
client.instances().list().execute.return_value = {
"items": [
{
"name": "instance_empty_backup_config",
"databaseVersion": "POSTGRES_14",
"region": "europe-west1",
"ipAddresses": [
{"type": "PRIMARY", "ipAddress": "34.34.34.34"}
],
"settings": {
"ipConfiguration": {
"requireSsl": False,
},
"backupConfiguration": {}, # Empty but present
},
}
]
}
client.instances().list_next.return_value = None
return client
with (
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__",
new=mock_is_api_active,
),
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__generate_client__",
new=mock_api_client_with_empty_backup_config,
),
):
cloudsql_client = CloudSQL(
set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID])
)
# Should handle gracefully with default value when 'enabled' key is missing
assert len(cloudsql_client.instances) == 1
assert cloudsql_client.instances[0].name == "instance_empty_backup_config"
assert (
cloudsql_client.instances[0].automated_backups is False
) # Default value
def test_instances_without_settings_fields(self):
"""Test that CloudSQL service handles instances with minimal settings"""
def mock_api_client_with_minimal_settings(*args, **kwargs):
from unittest.mock import MagicMock
client = MagicMock()
client.instances().list().execute.return_value = {
"items": [
{
"name": "instance_minimal",
"databaseVersion": "SQLSERVER_2019_STANDARD",
"region": "asia-east1",
"settings": {}, # Minimal settings object
}
]
}
client.instances().list_next.return_value = None
return client
with (
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__",
new=mock_is_api_active,
),
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__generate_client__",
new=mock_api_client_with_minimal_settings,
),
):
cloudsql_client = CloudSQL(
set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID])
)
# Should handle gracefully with all default values
assert len(cloudsql_client.instances) == 1
assert cloudsql_client.instances[0].name == "instance_minimal"
assert cloudsql_client.instances[0].automated_backups is False
assert cloudsql_client.instances[0].require_ssl is False
assert (
cloudsql_client.instances[0].ssl_mode
== "ALLOW_UNENCRYPTED_AND_ENCRYPTED"
)
assert cloudsql_client.instances[0].authorized_networks == []
assert cloudsql_client.instances[0].flags == []

View File

@@ -86,3 +86,217 @@ class TestMonitoringService:
assert "111222233334444" in monitoring_client.sa_api_metrics
assert "55566666777888999" in monitoring_client.sa_api_metrics
assert "0000000000000" not in monitoring_client.sa_api_metrics
def test_alert_policies_with_different_condition_types(self):
"""Test that monitoring service handles different alert policy condition types"""
def mock_api_client_with_mixed_conditions(*args, **kwargs):
from unittest.mock import MagicMock
client = MagicMock()
# Mock alert policies with different condition types
client.projects().alertPolicies().list().execute.return_value = {
"alertPolicies": [
{
"name": "policy_with_threshold",
"displayName": "Threshold Policy",
"conditions": [
{
"conditionThreshold": {
"filter": 'metric.type="compute.googleapis.com/instance/cpu/utilization"',
"comparison": "COMPARISON_GT",
"thresholdValue": 0.8,
}
}
],
"enabled": True,
},
{
"name": "policy_with_absent",
"displayName": "Absent Policy",
"conditions": [
{
"conditionAbsent": {
"filter": 'metric.type="compute.googleapis.com/instance/uptime"',
"duration": "300s",
}
}
],
"enabled": True,
},
{
"name": "policy_with_log",
"displayName": "Log Match Policy",
"conditions": [
{
"conditionMatchedLog": {
"filter": 'severity="ERROR"',
}
}
],
"enabled": True,
},
{
"name": "policy_with_mql",
"displayName": "MQL Policy",
"conditions": [
{
"conditionMonitoringQueryLanguage": {
"query": 'fetch gce_instance | metric "compute.googleapis.com/instance/cpu/utilization"',
"duration": "60s",
}
}
],
"enabled": True,
},
]
}
client.projects().alertPolicies().list_next.return_value = None
client.projects().timeSeries().list().execute.return_value = {
"timeSeries": []
}
return client
with (
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__",
new=mock_is_api_active,
),
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__generate_client__",
new=mock_api_client_with_mixed_conditions,
),
):
monitoring_client = Monitoring(
set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID])
)
assert len(monitoring_client.alert_policies) == 4
# Verify threshold condition
threshold_policy = monitoring_client.alert_policies[0]
assert threshold_policy.name == "policy_with_threshold"
assert len(threshold_policy.filters) == 1
assert (
'metric.type="compute.googleapis.com/instance/cpu/utilization"'
in threshold_policy.filters[0]
)
# Verify absent condition
absent_policy = monitoring_client.alert_policies[1]
assert absent_policy.name == "policy_with_absent"
assert len(absent_policy.filters) == 1
assert (
'metric.type="compute.googleapis.com/instance/uptime"'
in absent_policy.filters[0]
)
# Verify log condition
log_policy = monitoring_client.alert_policies[2]
assert log_policy.name == "policy_with_log"
assert len(log_policy.filters) == 1
assert 'severity="ERROR"' in log_policy.filters[0]
# Verify MQL condition
mql_policy = monitoring_client.alert_policies[3]
assert mql_policy.name == "policy_with_mql"
assert len(mql_policy.filters) == 1
assert "fetch gce_instance" in mql_policy.filters[0]
def test_alert_policies_with_missing_conditions(self):
"""Test that monitoring service handles alert policies with missing conditions field"""
def mock_api_client_with_missing_conditions(*args, **kwargs):
from unittest.mock import MagicMock
client = MagicMock()
client.projects().alertPolicies().list().execute.return_value = {
"alertPolicies": [
{
"name": "policy_without_conditions",
"displayName": "Policy Without Conditions",
"enabled": True,
}
]
}
client.projects().alertPolicies().list_next.return_value = None
client.projects().timeSeries().list().execute.return_value = {
"timeSeries": []
}
return client
with (
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__",
new=mock_is_api_active,
),
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__generate_client__",
new=mock_api_client_with_missing_conditions,
),
):
monitoring_client = Monitoring(
set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID])
)
# Should handle gracefully and create policy with empty filters
assert len(monitoring_client.alert_policies) == 1
assert (
monitoring_client.alert_policies[0].name == "policy_without_conditions"
)
assert monitoring_client.alert_policies[0].filters == []
def test_alert_policies_with_empty_filter_values(self):
"""Test that monitoring service skips conditions with empty filter values"""
def mock_api_client_with_empty_filters(*args, **kwargs):
from unittest.mock import MagicMock
client = MagicMock()
client.projects().alertPolicies().list().execute.return_value = {
"alertPolicies": [
{
"name": "policy_with_empty_filter",
"displayName": "Policy With Empty Filter",
"conditions": [
{
"conditionThreshold": {
"filter": "", # Empty filter
"comparison": "COMPARISON_GT",
"thresholdValue": 1000,
}
}
],
"enabled": True,
}
]
}
client.projects().alertPolicies().list_next.return_value = None
client.projects().timeSeries().list().execute.return_value = {
"timeSeries": []
}
return client
with (
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__",
new=mock_is_api_active,
),
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__generate_client__",
new=mock_api_client_with_empty_filters,
),
):
monitoring_client = Monitoring(
set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID])
)
# Should skip empty filters
assert len(monitoring_client.alert_policies) == 1
assert monitoring_client.alert_policies[0].filters == []