fix(gcp): credit audit-filtered aggregated sinks in metric-filter checks (#11575)

Co-authored-by: Hugo P.Brito <hugopbrit@gmail.com>
This commit is contained in:
Aline Almeida
2026-06-16 09:11:16 +01:00
committed by GitHub
parent f1e42d1681
commit cb4b889b20
3 changed files with 184 additions and 3 deletions
@@ -1,5 +1,7 @@
from unittest.mock import MagicMock, patch
import pytest
from prowler.providers.gcp.services.logging.logging_service import Logging
from tests.providers.gcp.gcp_fixtures import (
GCP_PROJECT_ID,
@@ -291,6 +293,93 @@ class TestGetProjectsCoveredByAggregatedMetric:
)
assert self._run(logging_client, monitoring_client) == {}
def test_not_covered_when_sink_filter_omits_activity_stream(self):
"""A sink that routes cloudaudit streams but NOT Admin Activity (here,
data_access only) does not deliver the entries the CIS metric filters
match, so it must not be credited — right service, wrong stream."""
logging_client, monitoring_client = self._clients(
sink_filter="logName: /logs/cloudaudit.googleapis.com%2Fdata_access"
)
assert self._run(logging_client, monitoring_client) == {}
def test_covered_when_sink_filter_carries_activity_stream_encoded(self):
"""A sink filtered to the cloudaudit streams (URL-encoded logName form,
as returned by the Logging API) delivers every Admin Activity entry the
CIS metric filters can match, so it must be credited."""
logging_client, monitoring_client = self._clients(
sink_filter=(
"logName: /logs/cloudaudit.googleapis.com%2Factivity OR "
"logName: /logs/cloudaudit.googleapis.com%2Fdata_access"
)
)
assert self._run(logging_client, monitoring_client) == {
GCP_PROJECT_ID: "central-metric"
}
def test_covered_when_sink_filter_carries_activity_stream_plain(self):
logging_client, monitoring_client = self._clients(
sink_filter='logName="projects/p/logs/cloudaudit.googleapis.com/activity"'
)
assert self._run(logging_client, monitoring_client) == {
GCP_PROJECT_ID: "central-metric"
}
@pytest.mark.parametrize(
"sink_filter",
[
# --- Negation: the stream is named but excluded. ---
'NOT logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity"',
'-logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity"',
'NOT log_id("cloudaudit.googleapis.com/activity")',
# "!=" inequality (and its spaced form) excludes the stream.
'logName!="projects/p/logs/cloudaudit.googleapis.com%2Factivity"',
'logName != "projects/p/logs/cloudaudit.googleapis.com/activity"',
# Activity negated inside a compound filter.
'resource.type="gce_instance" AND '
'NOT logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity"',
# --- Restriction: the stream is named but AND-narrowed, so only a
# subset of Admin Activity entries reaches the bucket. ---
'logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity" '
'AND resource.type="gce_instance"',
'log_id("cloudaudit.googleapis.com/activity") '
'AND resource.type="gce_instance"',
'logName="projects/p/logs/cloudaudit.googleapis.com/activity" '
"AND severity>=ERROR",
'logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity" '
'AND protoPayload.methodName="SetIamPolicy"',
# --- OR-ed with a non-audit predicate: fail closed, since we credit
# only unions of provable Cloud Audit stream selectors. ---
'logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity" '
"OR severity>=ERROR",
],
)
def test_not_covered_when_sink_filter_negated_or_restrictive(self, sink_filter):
"""A filter that names the Admin Activity stream but negates, narrows, or
mixes in an unprovable predicate is not credited — we credit only filters
we can prove deliver every Admin Activity entry the CIS metrics match."""
logging_client, monitoring_client = self._clients(sink_filter=sink_filter)
assert self._run(logging_client, monitoring_client) == {}
def test_covered_when_activity_logname_has_hyphenated_path(self):
"""A hyphen in the project path must not be mistaken for the ``-`` (NOT)
negation operator — the activity stream is still delivered."""
logging_client, monitoring_client = self._clients(
sink_filter='logName="projects/my-project/logs/cloudaudit.googleapis.com/activity"'
)
assert self._run(logging_client, monitoring_client) == {
GCP_PROJECT_ID: "central-metric"
}
def test_covered_when_sink_filter_uses_log_id_selector(self):
"""The ``log_id()`` form is an equivalent positive full-coverage selector
of the Admin Activity stream and is credited like the ``logName`` form."""
logging_client, monitoring_client = self._clients(
sink_filter='log_id("cloudaudit.googleapis.com/activity")'
)
assert self._run(logging_client, monitoring_client) == {
GCP_PROJECT_ID: "central-metric"
}
def test_not_covered_when_sink_destination_bucket_differs(self):
logging_client, monitoring_client = self._clients(
sink_destination="logging.googleapis.com/projects/x/locations/eu/buckets/other"