diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 91c58f6cf9..6dc2aafb60 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -28,6 +28,14 @@ All notable changes to the **Prowler SDK** are documented in this file. --- +## [5.30.2] (Prowler UNRELEASED) + +### 🐞 Fixed + +- GCP `logging_log_metric_filter_and_alert_*` checks now credit org-level aggregated sinks filtered to the Admin Activity audit stream [(#11575)](https://github.com/prowler-cloud/prowler/pull/11575) + +--- + ## [5.30.0] (Prowler v5.30.0) ### 🚀 Added diff --git a/prowler/providers/gcp/services/logging/logging_service.py b/prowler/providers/gcp/services/logging/logging_service.py index 3d5b0d1e79..7d8843584e 100644 --- a/prowler/providers/gcp/services/logging/logging_service.py +++ b/prowler/providers/gcp/services/logging/logging_service.py @@ -1,9 +1,12 @@ +import re + from pydantic.v1 import BaseModel from prowler.lib.logger import logger from prowler.providers.gcp.config import DEFAULT_RETRY_ATTEMPTS from prowler.providers.gcp.gcp_provider import GcpProvider from prowler.providers.gcp.lib.service.service import GCPService +from prowler.providers.gcp.services.monitoring.monitoring_service import Monitoring class Logging(GCPService): @@ -121,9 +124,86 @@ class Metric(BaseModel): bucket_name: str = "" +# A positive selector of the Admin Activity stream: a ``logName`` predicate +# (``:`` has-substring or ``=`` equals) or a ``log_id()`` call. Written verbose +# so each fragment stays legible; ``(?![a-z_])`` keeps a longer stream name +# (``.../activity_v2``) from impersonating Admin Activity. +_ACTIVITY_SELECTOR = re.compile( + r""" + (?: logName \s* [:=] \s* | log_id \s* \( \s* ) # logName: / logName= / log_id( + ["']? [^"'\s)]* # optional quote, then path prefix + cloudaudit\.googleapis\.com/activity (?![a-z_]) # the Admin Activity stream itself + """, + re.IGNORECASE | re.VERBOSE, +) + +# The same selector for *any* Cloud Audit stream (activity, data_access, +# system_event, policy, access_transparency, …). Used to strip the OR-combined +# audit clauses so we can prove nothing restrictive is left over. +_CLOUDAUDIT_SELECTOR = re.compile( + r""" + (?: logName \s* [:=] \s* | log_id \s* \( \s* ) # logName: / logName= / log_id( + ["']? [^"'\s)]* # optional quote, then path prefix + cloudaudit\.googleapis\.com/[a-z_]+ # any cloudaudit stream + ["']? \s* \)? # optional closing quote / paren + """, + re.IGNORECASE | re.VERBOSE, +) + +# Operators that exclude or narrow coverage. Any of these means we cannot prove +# the sink delivers the *whole* Admin Activity stream, so it is not credited. +_NEGATION_OR_RESTRICTION = re.compile( + r""" + \bNOT\b # NOT exclusion + | \bAND\b # AND conjunction (restriction) + | != | !: # "!=" / "!:" inequality + | (?:^|[\s(]) -\s* [A-Za-z_] # leading "-" exclusion operator + """, + re.IGNORECASE | re.VERBOSE, +) + + +def _sink_delivers_activity_logs(sink_filter: str) -> bool: + """True only when a sink's filter *provably* exports the full Admin Activity + audit stream (or everything). + + Crediting flips a child project to PASS on a CIS security control, so the + match is deliberately conservative: a false FAIL is safe, a false PASS is + not. A non-``"all"`` filter is credited only when + + 1. it positively selects the Admin Activity stream + (``logName:.../activity``, ``logName="...activity"`` or + ``log_id("...activity")``); + 2. it carries no operator that excludes or narrows the stream — ``NOT`` / + ``-`` / ``!=`` (negation) or ``AND`` (restriction); and + 3. nothing but ``OR``-combined Cloud Audit selectors remains once those are + stripped — an ``OR`` only widens coverage, but any leftover predicate + (``severity>=ERROR``, ``resource.type=...``) could narrow it. + + Sink filters encode the stream URL-encoded (``...%2Factivity``) or as a path + — normalize before matching. + """ + if not sink_filter or sink_filter.strip().lower() == "all": + return True + normalized = sink_filter.replace("%2F", "/").replace("%2f", "/") + # 1. The Admin Activity stream must be positively selected. + if not _ACTIVITY_SELECTOR.search(normalized): + return False + # 2. No operator may exclude or narrow that coverage. + if _NEGATION_OR_RESTRICTION.search(normalized): + return False + # 3. Only OR-combined audit selectors may remain — strip them and the OR + # glue; anything left is a predicate we cannot prove is full-coverage. + remainder = _CLOUDAUDIT_SELECTOR.sub(" ", normalized) + remainder = re.sub(r"\bOR\b|[()\s]", " ", remainder, flags=re.IGNORECASE) + return remainder.strip() == "" + + def get_projects_covered_by_aggregated_metric( - logging_client, monitoring_client, metric_filter -): + logging_client: Logging, + monitoring_client: Monitoring, + metric_filter: str, +) -> dict[str, str]: """Return {project_id: metric_name} for scanned projects whose logs are routed, via an organization-level sink with includeChildren=True, to a bucket that holds a bucket-scoped log metric matching ``metric_filter`` that has an alert policy. @@ -133,6 +213,10 @@ def get_projects_covered_by_aggregated_metric( every child project's logs into one bucket, where a single bucket-scoped metric + alert covers them all. Without crediting that, those child projects are falsely failed. Mirrors the org-sink handling already in ``logging_sink_created`` (#11355). + + A sink is credited when it exports everything (``filter == "all"``) or when its + filter carries the Admin Activity audit stream — the only stream the CIS metric + filters can match (see ``_sink_delivers_activity_logs``). """ # Buckets that hold a matching, alerted, bucket-scoped metric -> metric name. bucket_to_metric = {} @@ -155,7 +239,7 @@ def get_projects_covered_by_aggregated_metric( for sink in logging_client.sinks: if not getattr(sink, "include_children", False): continue - if getattr(sink, "filter", "all") != "all": + if not _sink_delivers_activity_logs(getattr(sink, "filter", "all")): continue for bucket, metric_name in bucket_to_metric.items(): # sink.destination e.g. "logging.googleapis.com/projects/.../buckets/X"; diff --git a/tests/providers/gcp/services/logging/logging_service_test.py b/tests/providers/gcp/services/logging/logging_service_test.py index 72e18eddbb..e466a17314 100644 --- a/tests/providers/gcp/services/logging/logging_service_test.py +++ b/tests/providers/gcp/services/logging/logging_service_test.py @@ -1,5 +1,7 @@ from unittest.mock import MagicMock, patch +import pytest + from prowler.providers.gcp.services.logging.logging_service import Logging from tests.providers.gcp.gcp_fixtures import ( GCP_PROJECT_ID, @@ -291,6 +293,93 @@ class TestGetProjectsCoveredByAggregatedMetric: ) assert self._run(logging_client, monitoring_client) == {} + def test_not_covered_when_sink_filter_omits_activity_stream(self): + """A sink that routes cloudaudit streams but NOT Admin Activity (here, + data_access only) does not deliver the entries the CIS metric filters + match, so it must not be credited — right service, wrong stream.""" + logging_client, monitoring_client = self._clients( + sink_filter="logName: /logs/cloudaudit.googleapis.com%2Fdata_access" + ) + assert self._run(logging_client, monitoring_client) == {} + + def test_covered_when_sink_filter_carries_activity_stream_encoded(self): + """A sink filtered to the cloudaudit streams (URL-encoded logName form, + as returned by the Logging API) delivers every Admin Activity entry the + CIS metric filters can match, so it must be credited.""" + logging_client, monitoring_client = self._clients( + sink_filter=( + "logName: /logs/cloudaudit.googleapis.com%2Factivity OR " + "logName: /logs/cloudaudit.googleapis.com%2Fdata_access" + ) + ) + assert self._run(logging_client, monitoring_client) == { + GCP_PROJECT_ID: "central-metric" + } + + def test_covered_when_sink_filter_carries_activity_stream_plain(self): + logging_client, monitoring_client = self._clients( + sink_filter='logName="projects/p/logs/cloudaudit.googleapis.com/activity"' + ) + assert self._run(logging_client, monitoring_client) == { + GCP_PROJECT_ID: "central-metric" + } + + @pytest.mark.parametrize( + "sink_filter", + [ + # --- Negation: the stream is named but excluded. --- + 'NOT logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity"', + '-logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity"', + 'NOT log_id("cloudaudit.googleapis.com/activity")', + # "!=" inequality (and its spaced form) excludes the stream. + 'logName!="projects/p/logs/cloudaudit.googleapis.com%2Factivity"', + 'logName != "projects/p/logs/cloudaudit.googleapis.com/activity"', + # Activity negated inside a compound filter. + 'resource.type="gce_instance" AND ' + 'NOT logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity"', + # --- Restriction: the stream is named but AND-narrowed, so only a + # subset of Admin Activity entries reaches the bucket. --- + 'logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity" ' + 'AND resource.type="gce_instance"', + 'log_id("cloudaudit.googleapis.com/activity") ' + 'AND resource.type="gce_instance"', + 'logName="projects/p/logs/cloudaudit.googleapis.com/activity" ' + "AND severity>=ERROR", + 'logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity" ' + 'AND protoPayload.methodName="SetIamPolicy"', + # --- OR-ed with a non-audit predicate: fail closed, since we credit + # only unions of provable Cloud Audit stream selectors. --- + 'logName:"projects/p/logs/cloudaudit.googleapis.com%2Factivity" ' + "OR severity>=ERROR", + ], + ) + def test_not_covered_when_sink_filter_negated_or_restrictive(self, sink_filter): + """A filter that names the Admin Activity stream but negates, narrows, or + mixes in an unprovable predicate is not credited — we credit only filters + we can prove deliver every Admin Activity entry the CIS metrics match.""" + logging_client, monitoring_client = self._clients(sink_filter=sink_filter) + assert self._run(logging_client, monitoring_client) == {} + + def test_covered_when_activity_logname_has_hyphenated_path(self): + """A hyphen in the project path must not be mistaken for the ``-`` (NOT) + negation operator — the activity stream is still delivered.""" + logging_client, monitoring_client = self._clients( + sink_filter='logName="projects/my-project/logs/cloudaudit.googleapis.com/activity"' + ) + assert self._run(logging_client, monitoring_client) == { + GCP_PROJECT_ID: "central-metric" + } + + def test_covered_when_sink_filter_uses_log_id_selector(self): + """The ``log_id()`` form is an equivalent positive full-coverage selector + of the Admin Activity stream and is credited like the ``logName`` form.""" + logging_client, monitoring_client = self._clients( + sink_filter='log_id("cloudaudit.googleapis.com/activity")' + ) + assert self._run(logging_client, monitoring_client) == { + GCP_PROJECT_ID: "central-metric" + } + def test_not_covered_when_sink_destination_bucket_differs(self): logging_client, monitoring_client = self._clients( sink_destination="logging.googleapis.com/projects/x/locations/eu/buckets/other"