feat(gcp): add check cloudstorage_bucket_sufficient_retention_period (#9149)

This commit is contained in:
lydiavilchez
2025-11-11 15:51:57 +01:00
committed by GitHub
parent 1292abcf91
commit b0ec7daece
12 changed files with 367 additions and 20 deletions

View File

@@ -11,6 +11,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `cloudstorage_bucket_versioning_enabled` check for GCP provider [(#9014)](https://github.com/prowler-cloud/prowler/pull/9014)
- `cloudstorage_bucket_soft_delete_enabled` check for GCP provider [(#9028)](https://github.com/prowler-cloud/prowler/pull/9028)
- `cloudstorage_bucket_logging_enabled` check for GCP provider [(#9091)](https://github.com/prowler-cloud/prowler/pull/9091)
- `cloudstorage_bucket_sufficient_retention_period` check for GCP provider [(#9149)](https://github.com/prowler-cloud/prowler/pull/9149)
- C5 compliance framework for Azure provider [(#9081)](https://github.com/prowler-cloud/prowler/pull/9081)
- C5 compliance framework for the GCP provider [(#9097)](https://github.com/prowler-cloud/prowler/pull/9097)
- `organization_repository_creation_limited` check for GitHub provider [(#8844)](https://github.com/prowler-cloud/prowler/pull/8844)

View File

@@ -511,6 +511,9 @@ gcp:
# gcp.iam_service_account_unused
# gcp.iam_sa_user_managed_key_unused
max_unused_account_days: 180
# GCP Storage Sufficient Retention Period
# gcp.cloudstorage_bucket_sufficient_retention_period
storage_min_retention_days: 90
# Kubernetes Configuration
kubernetes:

View File

@@ -1,26 +1,29 @@
{
"Provider": "gcp",
"CheckID": "cloudstorage_bucket_log_retention_policy_lock",
"CheckTitle": "Ensure That Retention Policies on Cloud Storage Buckets Used for Exporting Logs Are Configured Using Bucket Lock",
"CheckTitle": "Cloud Storage log bucket has a Retention Policy with Bucket Lock enabled",
"CheckType": [],
"ServiceName": "cloudstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Bucket",
"Description": "Enabling retention policies on log buckets will protect logs stored in cloud storage buckets from being overwritten or accidentally deleted.",
"Risk": "Sinks can be configured to export logs in storage buckets. It is recommended to configure a data retention policy for these cloud storage buckets and to lock the data retention policy, thus permanently preventing the policy from being reduced or removed. This way, if the system is ever compromised by an attacker or a malicious insider who wants to cover their tracks, the activity logs are definitely preserved for forensics and security investigations.",
"ResourceType": "storage.googleapis.com/Bucket",
"Description": "**Google Cloud Storage buckets** used as **log sinks** are evaluated to ensure that a **Retention Policy** is configured and **Bucket Lock** is enabled. Enabling Bucket Lock permanently prevents the retention policy from being reduced or removed, protecting logs from modification or deletion.",
"Risk": "Log sink buckets without a locked retention policy are at risk of log tampering or accidental deletion. Without Bucket Lock, an attacker or user could remove or shorten the retention policy, compromising the integrity of audit logs required for forensics and compliance investigations.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://www.trendmicro.com/cloudoneconformity/knowledge-base/gcp/CloudStorage/retention-policies-with-bucket-lock.html"
],
"Remediation": {
"Code": {
"CLI": "",
"CLI": "gcloud storage buckets lock-retention-policy gs://<LOG_BUCKET_NAME>",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/gcp/CloudStorage/retention-policies-with-bucket-lock.html",
"Terraform": "https://docs.prowler.com/checks/gcp/logging-policies-1/ensure-that-retention-policies-on-log-buckets-are-configured-using-bucket-lock#terraform"
"Other": "1) Open Google Cloud Console → Storage → Buckets → <LOG_BUCKET_NAME>\n2) Go to the **Configuration** tab\n3) Under **Retention policy**, ensure a retention duration is set\n4) Click **Lock** to enable Bucket Lock and confirm the operation",
"Terraform": "```hcl\nresource \"google_storage_bucket\" \"log_bucket\" {\n name = var.log_bucket_name\n location = var.location\n\n retention_policy {\n retention_period = 31536000 # 365 days in seconds\n is_locked = true\n }\n}\n```"
},
"Recommendation": {
"Text": "It is recommended to set up retention policies and configure Bucket Lock on all storage buckets that are used as log sinks.",
"Url": "https://cloud.google.com/storage/docs/using-uniform-bucket-level-access"
"Text": "Configure a retention policy and enable Bucket Lock on all Cloud Storage buckets used as log sinks to ensure log integrity and immutability.",
"Url": "https://hub.prowler.com/check/cloudstorage_bucket_log_retention_policy_lock"
}
},
"Categories": [],

View File

@@ -6,7 +6,14 @@ from prowler.providers.gcp.services.logging.logging_client import logging_client
class cloudstorage_bucket_log_retention_policy_lock(Check):
def execute(self) -> Check_Report_GCP:
"""
Ensure Log Sink buckets have a Retention Policy with Bucket Lock enabled.
- PASS: Log sink bucket has a retention policy and is locked.
- FAIL: Log sink bucket has no retention policy, or it has one but is not locked.
"""
def execute(self) -> list[Check_Report_GCP]:
findings = []
# Get Log Sink Buckets
log_buckets = []
@@ -22,8 +29,8 @@ class cloudstorage_bucket_log_retention_policy_lock(Check):
)
if bucket.retention_policy:
report.status = "FAIL"
report.status_extended = f"Log Sink Bucket {bucket.name} has no Retention Policy but without Bucket Lock."
if bucket.retention_policy.get("isLocked", False):
report.status_extended = f"Log Sink Bucket {bucket.name} has a Retention Policy but without Bucket Lock."
if bucket.retention_policy.is_locked:
report.status = "PASS"
report.status_extended = f"Log Sink Bucket {bucket.name} has a Retention Policy with Bucket Lock."
findings.append(report)

View File

@@ -0,0 +1,35 @@
{
"Provider": "gcp",
"CheckID": "cloudstorage_bucket_sufficient_retention_period",
"CheckTitle": "Cloud Storage bucket has a sufficient Retention Policy period",
"CheckType": [],
"ServiceName": "cloudstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "storage.googleapis.com/Bucket",
"Description": "Cloud Storage bucket has a bucket-level Retention Policy with a retentionPeriod that meets or exceeds the organization-defined minimum, preventing deletion or modification of objects before the required time.",
"Risk": "Insufficient or missing retention allows premature deletion or modification of objects, weakening data recovery and compliance with retention requirements.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://www.trendmicro.com/cloudoneconformity/knowledge-base/gcp/CloudStorage/sufficient-retention-period.html"
],
"Remediation": {
"Code": {
"CLI": "gcloud storage buckets update gs://<BUCKET_NAME> --retention-period=<SECONDS>",
"NativeIaC": "",
"Other": "1) Console → Storage → Buckets → <BUCKET_NAME>\n2) Tab 'Configuration' → 'Retention policy'\n3) Set the required retention period (e.g., 90 or 365 days) and save\n4) (Optional) Lock the policy if required by compliance",
"Terraform": "```hcl\nresource \"google_storage_bucket\" \"example\" {\n name = var.bucket_name\n location = var.location\n\n retention_policy {\n retention_period = 7776000 # 90 days in seconds\n }\n}\n```"
},
"Recommendation": {
"Text": "Define and apply a bucket-level Retention Policy that meets your minimum retention requirement (e.g., 90 or 365 days) to enforce data recoverability and compliance.",
"Url": "https://hub.prowler.com/check/cloudstorage_bucket_sufficient_retention_period"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,55 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.cloudstorage.cloudstorage_client import (
cloudstorage_client,
)
class cloudstorage_bucket_sufficient_retention_period(Check):
"""
Ensure there is a sufficient bucket-level retention period configured for GCS buckets.
PASS: retentionPolicy.retentionPeriod >= min threshold (days)
FAIL: no retention policy or period < threshold
"""
def execute(self) -> list[Check_Report_GCP]:
findings = []
min_retention_days = int(
getattr(cloudstorage_client, "audit_config", {}).get(
"storage_min_retention_days", 90
)
)
for bucket in cloudstorage_client.buckets:
report = Check_Report_GCP(metadata=self.metadata(), resource=bucket)
retention_policy = bucket.retention_policy
if retention_policy is None:
report.status = "FAIL"
report.status_extended = (
f"Bucket {bucket.name} does not have a retention policy "
f"(minimum required: {min_retention_days} days)."
)
findings.append(report)
continue
days = retention_policy.retention_period // 86400 # seconds to days
if days >= min_retention_days:
report.status = "PASS"
report.status_extended = (
f"Bucket {bucket.name} has a sufficient retention policy of {days} days "
f"(minimum required: {min_retention_days})."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Bucket {bucket.name} has an insufficient retention policy of {days} days "
f"(minimum required: {min_retention_days})."
)
findings.append(report)
return findings

View File

@@ -56,6 +56,21 @@ class CloudStorage(GCPService):
logging_bucket = logging_info.get("logBucket")
logging_prefix = logging_info.get("logObjectPrefix")
retention_policy_raw = bucket.get("retentionPolicy")
retention_policy = None
if isinstance(retention_policy_raw, dict):
rp_seconds = retention_policy_raw.get("retentionPeriod")
if rp_seconds:
retention_policy = RetentionPolicy(
retention_period=int(rp_seconds),
is_locked=bool(
retention_policy_raw.get("isLocked", False)
),
effective_time=retention_policy_raw.get(
"effectiveTime"
),
)
self.buckets.append(
Bucket(
name=bucket["name"],
@@ -65,7 +80,7 @@ class CloudStorage(GCPService):
"uniformBucketLevelAccess"
]["enabled"],
public=public,
retention_policy=bucket.get("retentionPolicy"),
retention_policy=retention_policy,
project_id=project_id,
lifecycle_rules=lifecycle_rules,
versioning_enabled=versioning_enabled,
@@ -84,6 +99,12 @@ class CloudStorage(GCPService):
)
class RetentionPolicy(BaseModel):
retention_period: int
is_locked: bool
effective_time: Optional[str] = None
class Bucket(BaseModel):
name: str
id: str
@@ -91,7 +112,7 @@ class Bucket(BaseModel):
uniform_bucket_level_access: bool
public: bool
project_id: str
retention_policy: Optional[dict] = None
retention_policy: Optional[RetentionPolicy] = None
lifecycle_rules: Optional[list[dict]] = None
versioning_enabled: Optional[bool] = False
soft_delete_enabled: Optional[bool] = False

View File

@@ -90,6 +90,7 @@ class TestGCPProvider:
assert gcp_provider.audit_config == {
"shodan_api_key": None,
"max_unused_account_days": 180,
"storage_min_retention_days": 90,
}
@freeze_time(datetime.today())

View File

@@ -31,6 +31,7 @@ class TestCloudStorageBucketLogRetentionPolicyLock:
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_service import (
Bucket,
RetentionPolicy,
)
from prowler.providers.gcp.services.logging.logging_service import Sink
@@ -53,7 +54,11 @@ class TestCloudStorageBucketLogRetentionPolicyLock:
region=GCP_US_CENTER1_LOCATION,
uniform_bucket_level_access=True,
public=True,
retention_policy={"isLocked": True},
retention_policy=RetentionPolicy(
retention_period=31536000,
is_locked=True,
effective_time=None,
),
project_id=GCP_PROJECT_ID,
)
]
@@ -95,6 +100,7 @@ class TestCloudStorageBucketLogRetentionPolicyLock:
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_service import (
Bucket,
RetentionPolicy,
)
from prowler.providers.gcp.services.logging.logging_service import Sink
@@ -117,7 +123,11 @@ class TestCloudStorageBucketLogRetentionPolicyLock:
region=GCP_US_CENTER1_LOCATION,
uniform_bucket_level_access=True,
public=True,
retention_policy={"isLocked": False},
retention_policy=RetentionPolicy(
retention_period=31536000,
is_locked=False,
effective_time=None,
),
project_id=GCP_PROJECT_ID,
)
]
@@ -129,7 +139,7 @@ class TestCloudStorageBucketLogRetentionPolicyLock:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Log Sink Bucket {cloudstorage_client.buckets[0].name} has no Retention Policy but without Bucket Lock."
== f"Log Sink Bucket {cloudstorage_client.buckets[0].name} has a Retention Policy but without Bucket Lock."
)
assert result[0].resource_id == "example-bucket"
assert result[0].resource_name == "example-bucket"

View File

@@ -0,0 +1,202 @@
from unittest import mock
from tests.providers.gcp.gcp_fixtures import (
GCP_PROJECT_ID,
GCP_US_CENTER1_LOCATION,
set_mocked_gcp_provider,
)
class TestCloudStorageBucketSufficientRetentionPeriod:
def test_no_buckets(self):
cloudstorage_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage.cloudstorage_bucket_sufficient_retention_period.cloudstorage_bucket_sufficient_retention_period.cloudstorage_client",
new=cloudstorage_client,
),
):
from prowler.providers.gcp.services.cloudstorage.cloudstorage_bucket_sufficient_retention_period.cloudstorage_bucket_sufficient_retention_period import (
cloudstorage_bucket_sufficient_retention_period,
)
cloudstorage_client.project_ids = [GCP_PROJECT_ID]
cloudstorage_client.region = GCP_US_CENTER1_LOCATION
cloudstorage_client.buckets = []
cloudstorage_client.audit_config = {"storage_min_retention_days": 90}
check = cloudstorage_bucket_sufficient_retention_period()
result = check.execute()
assert len(result) == 0
def test_bucket_without_retention_policy(self):
cloudstorage_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage.cloudstorage_bucket_sufficient_retention_period.cloudstorage_bucket_sufficient_retention_period.cloudstorage_client",
new=cloudstorage_client,
),
):
from prowler.providers.gcp.services.cloudstorage.cloudstorage_bucket_sufficient_retention_period.cloudstorage_bucket_sufficient_retention_period import (
cloudstorage_bucket_sufficient_retention_period,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_service import (
Bucket,
)
cloudstorage_client.project_ids = [GCP_PROJECT_ID]
cloudstorage_client.region = GCP_US_CENTER1_LOCATION
cloudstorage_client.audit_config = {"storage_min_retention_days": 90}
cloudstorage_client.buckets = [
Bucket(
name="no-retention-policy",
id="no-retention-policy",
region=GCP_US_CENTER1_LOCATION,
uniform_bucket_level_access=True,
public=False,
retention_policy=None,
project_id=GCP_PROJECT_ID,
lifecycle_rules=[],
versioning_enabled=True,
)
]
check = cloudstorage_bucket_sufficient_retention_period()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Bucket no-retention-policy does not have a retention policy (minimum required: 90 days)."
)
assert result[0].resource_id == "no-retention-policy"
assert result[0].resource_name == "no-retention-policy"
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_bucket_with_sufficient_retention_policy(self):
cloudstorage_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage.cloudstorage_bucket_sufficient_retention_period.cloudstorage_bucket_sufficient_retention_period.cloudstorage_client",
new=cloudstorage_client,
),
):
from prowler.providers.gcp.services.cloudstorage.cloudstorage_bucket_sufficient_retention_period.cloudstorage_bucket_sufficient_retention_period import (
cloudstorage_bucket_sufficient_retention_period,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_service import (
Bucket,
RetentionPolicy,
)
cloudstorage_client.project_ids = [GCP_PROJECT_ID]
cloudstorage_client.region = GCP_US_CENTER1_LOCATION
cloudstorage_client.audit_config = {"storage_min_retention_days": 90}
cloudstorage_client.buckets = [
Bucket(
name="sufficient-retention-policy",
id="sufficient-retention-policy",
region=GCP_US_CENTER1_LOCATION,
uniform_bucket_level_access=True,
public=False,
retention_policy=RetentionPolicy(
retention_period=12096000, # 140 days
is_locked=False,
effective_time=None,
),
project_id=GCP_PROJECT_ID,
lifecycle_rules=[],
versioning_enabled=True,
)
]
check = cloudstorage_bucket_sufficient_retention_period()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Bucket sufficient-retention-policy has a sufficient retention policy of 140 days (minimum required: 90)."
)
assert result[0].resource_id == "sufficient-retention-policy"
assert result[0].resource_name == "sufficient-retention-policy"
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_bucket_with_insufficient_retention_policy(self):
cloudstorage_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage.cloudstorage_bucket_sufficient_retention_period.cloudstorage_bucket_sufficient_retention_period.cloudstorage_client",
new=cloudstorage_client,
),
):
from prowler.providers.gcp.services.cloudstorage.cloudstorage_bucket_sufficient_retention_period.cloudstorage_bucket_sufficient_retention_period import (
cloudstorage_bucket_sufficient_retention_period,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_service import (
Bucket,
RetentionPolicy,
)
cloudstorage_client.project_ids = [GCP_PROJECT_ID]
cloudstorage_client.region = GCP_US_CENTER1_LOCATION
cloudstorage_client.audit_config = {"storage_min_retention_days": 90}
cloudstorage_client.buckets = [
Bucket(
name="insufficient-retention-policy",
id="insufficient-retention-policy",
region=GCP_US_CENTER1_LOCATION,
uniform_bucket_level_access=True,
public=False,
retention_policy=RetentionPolicy(
retention_period=604800, # 7 days
is_locked=False,
effective_time=None,
),
project_id=GCP_PROJECT_ID,
lifecycle_rules=[],
versioning_enabled=True,
)
]
check = cloudstorage_bucket_sufficient_retention_period()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Bucket insufficient-retention-policy has an insufficient retention policy of 7 days (minimum required: 90)."
)
assert result[0].resource_id == "insufficient-retention-policy"
assert result[0].resource_name == "insufficient-retention-policy"
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID

View File

@@ -2,6 +2,7 @@ from unittest.mock import patch
from prowler.providers.gcp.services.cloudstorage.cloudstorage_service import (
CloudStorage,
RetentionPolicy,
)
from tests.providers.gcp.gcp_fixtures import (
GCP_PROJECT_ID,
@@ -35,9 +36,17 @@ class TestCloudStorageService:
assert cloudstorage_client.buckets[0].region == "US"
assert cloudstorage_client.buckets[0].uniform_bucket_level_access
assert cloudstorage_client.buckets[0].public
assert cloudstorage_client.buckets[0].retention_policy == {
"retentionPeriod": 10
}
assert isinstance(
cloudstorage_client.buckets[0].retention_policy, RetentionPolicy
)
assert (
cloudstorage_client.buckets[0].retention_policy.retention_period == 10
)
assert cloudstorage_client.buckets[0].retention_policy.is_locked is False
assert (
cloudstorage_client.buckets[0].retention_policy.effective_time is None
)
assert cloudstorage_client.buckets[0].project_id == GCP_PROJECT_ID
assert cloudstorage_client.buckets[1].name == "bucket2"