feat(gcp): implement Cloud Storage Data Access Audit Logs check (#9220)

Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
This commit is contained in:
lydiavilchez
2025-11-18 12:08:54 +01:00
committed by GitHub
parent 520cc31f73
commit 0ba1226d88
8 changed files with 553 additions and 7 deletions

View File

@@ -11,6 +11,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `cloudstorage_bucket_versioning_enabled` check for GCP provider [(#9014)](https://github.com/prowler-cloud/prowler/pull/9014)
- `cloudstorage_bucket_soft_delete_enabled` check for GCP provider [(#9028)](https://github.com/prowler-cloud/prowler/pull/9028)
- `cloudstorage_bucket_logging_enabled` check for GCP provider [(#9091)](https://github.com/prowler-cloud/prowler/pull/9091)
- `cloudstorage_audit_logs_enabled` check for GCP provider [(#9220)](https://github.com/prowler-cloud/prowler/pull/9220)
- `cloudstorage_bucket_sufficient_retention_period` check for GCP provider [(#9149)](https://github.com/prowler-cloud/prowler/pull/9149)
- C5 compliance framework for Azure provider [(#9081)](https://github.com/prowler-cloud/prowler/pull/9081)
- C5 compliance framework for the GCP provider [(#9097)](https://github.com/prowler-cloud/prowler/pull/9097)

View File

@@ -25,10 +25,25 @@ class CloudResourceManager(GCPService):
.execute(num_retries=DEFAULT_RETRY_ATTEMPTS)
)
audit_logging = False
audit_configs = []
if policy.get("auditConfigs"):
audit_logging = True
for config in policy.get("auditConfigs", []):
log_types = []
for log_config in config.get("auditLogConfigs", []):
log_types.append(log_config.get("logType", ""))
audit_configs.append(
AuditConfig(
service=config.get("service", ""),
log_types=log_types,
)
)
self.cloud_resource_manager_projects.append(
Project(id=project_id, audit_logging=audit_logging)
Project(
id=project_id,
audit_logging=audit_logging,
audit_configs=audit_configs,
)
)
for binding in policy["bindings"]:
self.bindings.append(
@@ -40,7 +55,9 @@ class CloudResourceManager(GCPService):
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{self.region} -- "
f"{error.__class__.__name__}"
f"[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_organizations(self):
@@ -54,15 +71,23 @@ class CloudResourceManager(GCPService):
for org in response.get("organizations", []):
self.organizations.append(
Organization(
id=org["name"].split("/")[-1], name=org["displayName"]
id=org["name"].split("/")[-1],
name=org["displayName"],
)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{self.region} -- "
f"{error.__class__.__name__}"
f"[{error.__traceback__.tb_lineno}]: {error}"
)
class AuditConfig(BaseModel):
service: str
log_types: list[str]
class Binding(BaseModel):
role: str
members: list
@@ -72,6 +97,7 @@ class Binding(BaseModel):
class Project(BaseModel):
id: str
audit_logging: bool
audit_configs: list[AuditConfig] = []
class Organization(BaseModel):

View File

@@ -0,0 +1,36 @@
{
"Provider": "gcp",
"CheckID": "cloudstorage_audit_logs_enabled",
"CheckTitle": "Data Access audit logs are enabled for Cloud Storage",
"CheckType": [],
"ServiceName": "cloudstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "cloudresourcemanager.googleapis.com/Project",
"Description": "Data Access audit logs (DATA_READ and DATA_WRITE) are enabled for Cloud Storage at the project level. Unlike Admin Activity logs (enabled by default), Data Access logs must be explicitly configured to track read and write operations on Cloud Storage objects.",
"Risk": "Without Data Access audit logs, you cannot track who accessed or modified objects in your Cloud Storage buckets, making it difficult to detect unauthorized access, data exfiltration, or compliance violations.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://www.trendmicro.com/cloudoneconformity/knowledge-base/gcp/CloudStorage/enable-data-access-audit-logs.html",
"https://cloud.google.com/storage/docs/audit-logging"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1) Console → IAM & Admin → Audit Logs\n2) Find 'Google Cloud Storage' in the list of services\n3) Check the boxes for 'Data Read' and 'Data Write'\n4) Click 'Save' to apply the configuration\n\nNote: This is a project-level setting that applies to all Cloud Storage buckets in the project.",
"Terraform": "```hcl\nresource \"google_project_iam_audit_config\" \"storage_audit\" {\n project = var.project_id\n service = \"storage.googleapis.com\"\n\n audit_log_config {\n log_type = \"DATA_READ\"\n }\n\n audit_log_config {\n log_type = \"DATA_WRITE\"\n }\n}\n```"
},
"Recommendation": {
"Text": "Enable Data Access audit logs (DATA_READ and DATA_WRITE) for Cloud Storage at the project level to track all read and write operations on storage objects for security monitoring and compliance.",
"Url": "https://hub.prowler.com/check/cloudstorage_audit_logs_enabled"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,61 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_client import (
cloudresourcemanager_client,
)
class cloudstorage_audit_logs_enabled(Check):
"""
Ensure GCP Cloud Storage data access audit logs are enabled.
- PASS: Project has audit config for storage.googleapis.com or allServices with
DATA_READ and DATA_WRITE log types enabled.
- FAIL: Project is missing audit config for Cloud Storage,
or missing DATA_READ or DATA_WRITE log types.
"""
def execute(self) -> list[Check_Report_GCP]:
findings = []
for project in cloudresourcemanager_client.cloud_resource_manager_projects:
report = Check_Report_GCP(
metadata=self.metadata(),
resource=cloudresourcemanager_client.projects[project.id],
project_id=project.id,
location=cloudresourcemanager_client.region,
resource_name=(
cloudresourcemanager_client.projects[project.id].name
if cloudresourcemanager_client.projects[project.id].name
else "GCP Project"
),
)
log_types_set = set()
for config in project.audit_configs:
if config.service in ["storage.googleapis.com", "allServices"]:
log_types_set.update(config.log_types)
required_logs = {"DATA_READ", "DATA_WRITE"}
if project.audit_logging:
if required_logs.issubset(log_types_set):
report.status = "PASS"
report.status_extended = f"Project {project.id} has Data Access audit logs (DATA_READ and DATA_WRITE) enabled for Cloud Storage."
else:
report.status = "FAIL"
if not log_types_set:
report.status_extended = f"Project {project.id} has Audit Logs enabled for other services but not for Cloud Storage."
else:
report.status_extended = (
f"Project {project.id} has Audit Logs enabled for Cloud Storage but is missing some required log types"
f"(missing: {', '.join(sorted(required_logs - log_types_set))})."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Project {project.id} does not have Audit Logs enabled."
)
findings.append(report)
return findings

View File

@@ -85,7 +85,15 @@ def mock_api_projects_calls(client: MagicMock):
client.projects().locations().keys().list_next.return_value = None
# Mocking policy
client.projects().getIamPolicy().execute.return_value = {
"auditConfigs": [MagicMock()],
"auditConfigs": [
{
"service": "allServices",
"auditLogConfigs": [
{"logType": "ADMIN_READ"},
{"logType": "DATA_WRITE"},
],
}
],
"bindings": [
{
"role": "roles/resourcemanager.organizationAdmin",
@@ -246,7 +254,15 @@ def mock_api_projects_calls(client: MagicMock):
== "projects/123/locations/eu-west1/keyRings/keyring1/cryptoKeys/key1"
):
return_value.execute.return_value = {
"auditConfigs": [MagicMock()],
"auditConfigs": [
{
"service": "allServices",
"auditLogConfigs": [
{"logType": "ADMIN_READ"},
{"logType": "DATA_WRITE"},
],
}
],
"bindings": [
{
"role": "roles/resourcemanager.organizationAdmin",
@@ -274,7 +290,15 @@ def mock_api_projects_calls(client: MagicMock):
== "projects/123/locations/eu-west1/keyRings/keyring2/cryptoKeys/key2"
):
return_value.execute.return_value = {
"auditConfigs": [MagicMock()],
"auditConfigs": [
{
"service": "allServices",
"auditLogConfigs": [
{"logType": "ADMIN_READ"},
{"logType": "DATA_WRITE"},
],
}
],
"bindings": [
{
"role": "roles/resourcemanager.organizationAdmin",

View File

@@ -0,0 +1,398 @@
from unittest import mock
from prowler.providers.gcp.models import GCPProject
from tests.providers.gcp.gcp_fixtures import (
GCP_PROJECT_ID,
GCP_US_CENTER1_LOCATION,
set_mocked_gcp_provider,
)
class TestCloudStorageAuditLogsEnabled:
def test_no_projects(self):
cloudresourcemanager_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage."
"cloudstorage_audit_logs_enabled."
"cloudstorage_audit_logs_enabled."
"cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.cloudstorage.cloudstorage_audit_logs_enabled.cloudstorage_audit_logs_enabled import (
cloudstorage_audit_logs_enabled,
)
cloudresourcemanager_client.cloud_resource_manager_projects = []
cloudresourcemanager_client.region = GCP_US_CENTER1_LOCATION
check = cloudstorage_audit_logs_enabled()
result = check.execute()
assert len(result) == 0
def test_project_with_storage_audit_logs_enabled(self):
cloudresourcemanager_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage."
"cloudstorage_audit_logs_enabled."
"cloudstorage_audit_logs_enabled."
"cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
AuditConfig,
Project,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_audit_logs_enabled.cloudstorage_audit_logs_enabled import (
cloudstorage_audit_logs_enabled,
)
project = Project(
id=GCP_PROJECT_ID,
audit_logging=True,
audit_configs=[
AuditConfig(
service="storage.googleapis.com",
log_types=["DATA_READ", "DATA_WRITE"],
)
],
)
cloudresourcemanager_client.cloud_resource_manager_projects = [project]
cloudresourcemanager_client.region = GCP_US_CENTER1_LOCATION
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
check = cloudstorage_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
f"Project {GCP_PROJECT_ID} has Data Access audit logs "
f"(DATA_READ and DATA_WRITE) enabled for Cloud Storage."
)
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].resource_name == "test-project"
def test_project_with_audit_logs_but_no_storage_config(self):
cloudresourcemanager_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage."
"cloudstorage_audit_logs_enabled."
"cloudstorage_audit_logs_enabled."
"cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
AuditConfig,
Project,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_audit_logs_enabled.cloudstorage_audit_logs_enabled import (
cloudstorage_audit_logs_enabled,
)
# Project has audit logs enabled but for a different service (not Cloud Storage)
project = Project(
id=GCP_PROJECT_ID,
audit_logging=True,
audit_configs=[
AuditConfig(
service="compute.googleapis.com",
log_types=["DATA_READ", "DATA_WRITE"],
)
],
)
cloudresourcemanager_client.cloud_resource_manager_projects = [project]
cloudresourcemanager_client.region = GCP_US_CENTER1_LOCATION
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
check = cloudstorage_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
f"Project {GCP_PROJECT_ID} has Audit Logs enabled for other services but not for Cloud Storage."
)
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].resource_name == "test-project"
def test_project_without_audit_logs(self):
cloudresourcemanager_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage."
"cloudstorage_audit_logs_enabled."
"cloudstorage_audit_logs_enabled."
"cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Project,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_audit_logs_enabled.cloudstorage_audit_logs_enabled import (
cloudstorage_audit_logs_enabled,
)
project = Project(
id=GCP_PROJECT_ID,
audit_logging=False,
audit_configs=[],
)
cloudresourcemanager_client.cloud_resource_manager_projects = [project]
cloudresourcemanager_client.region = GCP_US_CENTER1_LOCATION
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
check = cloudstorage_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
f"Project {GCP_PROJECT_ID} does not have Audit Logs enabled."
)
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].resource_name == "test-project"
def test_project_with_missing_log_types(self):
cloudresourcemanager_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage."
"cloudstorage_audit_logs_enabled."
"cloudstorage_audit_logs_enabled."
"cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
AuditConfig,
Project,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_audit_logs_enabled.cloudstorage_audit_logs_enabled import (
cloudstorage_audit_logs_enabled,
)
project = Project(
id=GCP_PROJECT_ID,
audit_logging=True,
audit_configs=[
AuditConfig(
service="storage.googleapis.com",
log_types=["DATA_WRITE"],
)
],
)
cloudresourcemanager_client.cloud_resource_manager_projects = [project]
cloudresourcemanager_client.region = GCP_US_CENTER1_LOCATION
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
check = cloudstorage_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
f"Project {GCP_PROJECT_ID} has Audit Logs enabled for Cloud Storage but is missing some required log types"
f"(missing: DATA_READ)."
)
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].resource_name == "test-project"
def test_project_with_combined_audit_configs(self):
cloudresourcemanager_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage."
"cloudstorage_audit_logs_enabled."
"cloudstorage_audit_logs_enabled."
"cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
AuditConfig,
Project,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_audit_logs_enabled.cloudstorage_audit_logs_enabled import (
cloudstorage_audit_logs_enabled,
)
# Project has both allServices (with DATA_READ)
# and storage.googleapis.com (with DATA_WRITE)
project = Project(
id=GCP_PROJECT_ID,
audit_logging=True,
audit_configs=[
AuditConfig(
service="allServices",
log_types=["DATA_READ"],
),
AuditConfig(
service="storage.googleapis.com",
log_types=["DATA_WRITE"],
),
],
)
cloudresourcemanager_client.cloud_resource_manager_projects = [project]
cloudresourcemanager_client.region = GCP_US_CENTER1_LOCATION
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
check = cloudstorage_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
f"Project {GCP_PROJECT_ID} has Data Access audit logs "
f"(DATA_READ and DATA_WRITE) enabled for Cloud Storage."
)
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].resource_name == "test-project"
def test_project_with_allservices_audit_config(self):
cloudresourcemanager_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage."
"cloudstorage_audit_logs_enabled."
"cloudstorage_audit_logs_enabled."
"cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
AuditConfig,
Project,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_audit_logs_enabled.cloudstorage_audit_logs_enabled import (
cloudstorage_audit_logs_enabled,
)
# Project has allServices with both log types
project = Project(
id=GCP_PROJECT_ID,
audit_logging=True,
audit_configs=[
AuditConfig(
service="allServices",
log_types=["DATA_READ", "DATA_WRITE"],
)
],
)
cloudresourcemanager_client.cloud_resource_manager_projects = [project]
cloudresourcemanager_client.region = GCP_US_CENTER1_LOCATION
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
check = cloudstorage_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
f"Project {GCP_PROJECT_ID} has Data Access audit logs "
f"(DATA_READ and DATA_WRITE) enabled for Cloud Storage."
)
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].resource_name == "test-project"