fix(gcp): set unknown for resource name under metric resources (#9023)

This commit is contained in:
Pedro Martín
2025-10-27 14:19:15 +01:00
committed by GitHub
parent 81e3f87003
commit e694b0f634
7 changed files with 442 additions and 31 deletions

View File

@@ -7,6 +7,16 @@ All notable changes to the **Prowler SDK** are documented in this file.
### Added
- GitHub provider check `organization_default_repository_permission_strict` [(#8785)](https://github.com/prowler-cloud/prowler/pull/8785)
---
## [v5.13.1] (Prowler UNRELEASED)
### Fixed
- Add `resource_name` for checks under `logging` for the GCP provider [(#9023)](https://github.com/prowler-cloud/prowler/pull/9023)
---
## [v5.13.0] (Prowler v5.13.0)
### Added

View File

@@ -14,35 +14,37 @@ class logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled(
'resource.type="gcs_bucket" AND protoPayload.methodName="storage.setIamPermissions"'
in metric.filter
):
metric_name = getattr(metric, "name", None) or "unknown"
report = Check_Report_GCP(
metadata=self.metadata(),
resource=metric,
resource_id=metric_name,
project_id=metric.project_id,
location=logging_client.region,
resource_name=metric.name if metric.name else "Log Metric Filter",
resource_name=(
metric_name if metric_name != "unknown" else "Log Metric Filter"
),
)
projects_with_metric.add(metric.project_id)
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
report.status_extended = f"Log metric filter {metric_name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
if metric_name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
report.status_extended = f"Log metric filter {metric_name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
project_obj = logging_client.projects.get(project)
report = Check_Report_GCP(
metadata=self.metadata(),
resource=logging_client.projects[project],
resource=project_obj,
project_id=project,
location=logging_client.region,
resource_name=(
logging_client.projects[project].name
if logging_client.projects[project].name
else "GCP Project"
),
resource_name=(getattr(project_obj, "name", None) or "GCP Project"),
)
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."

View File

@@ -14,35 +14,38 @@ class logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled(
'(protoPayload.serviceName="cloudresourcemanager.googleapis.com") AND (ProjectOwnership OR projectOwnerInvitee) OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="REMOVE" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner") OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="ADD" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner")'
in metric.filter
):
metric_name = getattr(metric, "name", None) or "unknown"
report = Check_Report_GCP(
metadata=self.metadata(),
resource=metric,
resource_id=metric_name,
project_id=metric.project_id,
location=logging_client.region,
resource_name=metric.name if metric.name else "Log Metric Filter",
resource_name=(
metric_name if metric_name != "unknown" else "Log Metric Filter"
),
)
projects_with_metric.add(metric.project_id)
report.status = "FAIL"
report.status_extended = f"Log metric filter {metric.name} found but no alerts associated in project {metric.project_id}."
report.status_extended = f"Log metric filter {metric_name} found but no alerts associated in project {metric.project_id}."
for alert_policy in monitoring_client.alert_policies:
for filter in alert_policy.filters:
if metric.name in filter:
if metric_name in filter:
report.status = "PASS"
report.status_extended = f"Log metric filter {metric.name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
report.status_extended = f"Log metric filter {metric_name} found with alert policy {alert_policy.display_name} associated in project {metric.project_id}."
break
findings.append(report)
for project in logging_client.project_ids:
if project not in projects_with_metric:
project_obj = logging_client.projects.get(project)
report = Check_Report_GCP(
metadata=self.metadata(),
resource=logging_client.projects[project],
resource=project_obj,
resource_id=project,
project_id=project,
location=logging_client.region,
resource_name=(
logging_client.projects[project].name
if logging_client.projects[project].name
else "GCP Project"
),
resource_name=(getattr(project_obj, "name", None) or "GCP Project"),
)
report.status = "FAIL"
report.status_extended = f"There are no log metric filters or alerts associated in project {project}."

View File

@@ -12,32 +12,32 @@ class logging_sink_created(Check):
for project in logging_client.project_ids:
if project not in projects_with_logging_sink.keys():
project_obj = logging_client.projects.get(project)
report = Check_Report_GCP(
metadata=self.metadata(),
resource=logging_client.projects[project],
resource=project_obj,
resource_id=project,
project_id=project,
location=logging_client.region,
resource_name=(
logging_client.projects[project].name
if logging_client.projects[project].name
else "GCP Project"
),
resource_name=(getattr(project_obj, "name", None) or "GCP Project"),
)
report.status = "FAIL"
report.status_extended = f"There are no logging sinks to export copies of all the log entries in project {project}."
findings.append(report)
else:
sink = projects_with_logging_sink[project]
sink_name = getattr(sink, "name", None) or "unknown"
report = Check_Report_GCP(
metadata=self.metadata(),
resource=projects_with_logging_sink[project],
resource=sink,
resource_id=sink_name,
project_id=project,
location=logging_client.region,
resource_name=(
projects_with_logging_sink[project].name
if projects_with_logging_sink[project].name
else "Logging Sink"
sink_name if sink_name != "unknown" else "Logging Sink"
),
)
report.status = "PASS"
report.status_extended = f"Sink {projects_with_logging_sink[project].name} is enabled exporting copies of all the log entries in project {project}."
report.status_extended = f"Sink {sink_name} is enabled exporting copies of all the log entries in project {project}."
findings.append(report)
return findings

View File

@@ -259,3 +259,141 @@ class Test_logging_log_metric_filter_and_alert_for_bucket_permission_changes_ena
assert result[0].resource_name == "metric_name"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_with_none_name(self):
"""Test that metric with None name uses fallback 'Log Metric Filter'"""
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled import (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled,
)
# Create a MagicMock metric object with name=None
metric = MagicMock()
metric.name = None
metric.filter = 'resource.type="gcs_bucket" AND protoPayload.methodName="storage.setIamPermissions"'
metric.project_id = GCP_PROJECT_ID
logging_client.metrics = [metric]
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "Log Metric Filter"
assert (
result[0].resource_id == "unknown"
) # resource_id should never be None
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
# When name is None, the 'or' pattern makes it use "unknown"
assert "unknown" in result[0].status_extended
def test_log_metric_filters_with_missing_name_attribute(self):
"""Test that metric without name attribute uses fallback 'Log Metric Filter'"""
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled import (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled,
)
# Create a MagicMock metric object without name attribute
metric = MagicMock(spec=["filter", "project_id"])
metric.filter = 'resource.type="gcs_bucket" AND protoPayload.methodName="storage.setIamPermissions"'
metric.project_id = GCP_PROJECT_ID
logging_client.metrics = [metric]
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "Log Metric Filter"
assert (
result[0].resource_id == "unknown"
) # resource_id should never be None
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_project_not_in_projects_dict(self):
"""Test that project not in projects dict uses None and fallback name"""
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled import (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
# Project is in project_ids but NOT in projects dict
logging_client.projects = {}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "GCP Project"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION

View File

@@ -259,3 +259,136 @@ class Test_logging_log_metric_filter_and_alert_for_project_ownership_changes_ena
assert result[0].resource_name == "metric_name"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_with_none_name(self):
"""Test that metric with None name uses fallback 'Log Metric Filter'"""
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled import (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled,
)
# Create a MagicMock metric object with name=None
metric = MagicMock()
metric.name = None
metric.filter = '(protoPayload.serviceName="cloudresourcemanager.googleapis.com") AND (ProjectOwnership OR projectOwnerInvitee) OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="REMOVE" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner") OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="ADD" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner")'
metric.project_id = GCP_PROJECT_ID
logging_client.metrics = [metric]
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "Log Metric Filter"
assert result[0].resource_id == "unknown"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
assert "unknown" in result[0].status_extended
def test_log_metric_filters_with_missing_name_attribute(self):
"""Test that metric without name attribute uses fallback 'Log Metric Filter'"""
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled import (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled,
)
# Create a MagicMock metric object without name attribute
metric = MagicMock(spec=["filter", "project_id"])
metric.filter = '(protoPayload.serviceName="cloudresourcemanager.googleapis.com") AND (ProjectOwnership OR projectOwnerInvitee) OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="REMOVE" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner") OR (protoPayload.serviceData.policyDelta.bindingDeltas.action="ADD" AND protoPayload.serviceData.policyDelta.bindingDeltas.role="roles/owner")'
metric.project_id = GCP_PROJECT_ID
logging_client.metrics = [metric]
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "Log Metric Filter"
assert result[0].resource_id == "unknown"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_project_not_in_projects_dict(self):
"""Test that project not in projects dict uses None and fallback name"""
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled import (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
# Project is in project_ids but NOT in projects dict
logging_client.projects = {}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "GCP Project"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION

View File

@@ -211,3 +211,128 @@ class Test_logging_sink_created:
result[0].status_extended
== f"There are no logging sinks to export copies of all the log entries in project {GCP_PROJECT_ID}."
)
def test_project_not_in_projects_dict(self):
"""Test that project not in projects dict uses None and fallback name"""
logging_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created.logging_client",
new=logging_client,
),
):
from prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created import (
logging_sink_created,
)
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.sinks = []
# Project is in project_ids but NOT in projects dict
logging_client.projects = {}
check = logging_sink_created()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "GCP Project"
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_sink_with_none_name(self):
"""Test that sink with None name uses fallback 'Logging Sink'"""
logging_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created.logging_client",
new=logging_client,
),
):
from prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created import (
logging_sink_created,
)
# Create a MagicMock sink object with name=None
sink = MagicMock()
sink.name = None
sink.filter = "all"
sink.project_id = GCP_PROJECT_ID
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.sinks = [sink]
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test",
labels={},
lifecycle_state="ACTIVE",
)
}
check = logging_sink_created()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "Logging Sink"
assert result[0].resource_id == "unknown"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
assert "unknown" in result[0].status_extended
def test_sink_with_missing_name_attribute(self):
"""Test that sink without name attribute uses fallback 'Logging Sink'"""
logging_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created.logging_client",
new=logging_client,
),
):
from prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created import (
logging_sink_created,
)
# Create a MagicMock sink object without name attribute
sink = MagicMock(spec=["filter", "project_id"])
sink.filter = "all"
sink.project_id = GCP_PROJECT_ID
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.sinks = [sink]
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test",
labels={},
lifecycle_state="ACTIVE",
)
}
check = logging_sink_created()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "Logging Sink"
assert result[0].resource_id == "unknown"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION