fix(gcp-cloudstorage): handle VPC-blocked API calls as PASS (#9478)

This commit is contained in:
lydiavilchez
2025-12-10 10:40:52 +01:00
committed by GitHub
parent ba45b86a82
commit bfce602859
5 changed files with 135 additions and 3 deletions

View File

@@ -26,6 +26,9 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Update AWS Macie service metadata to new format [(#9265)](https://github.com/prowler-cloud/prowler/pull/9265)
- Update AWS Lightsail service metadata to new format [(#9264)](https://github.com/prowler-cloud/prowler/pull/9264)
### Fixed
- GCP `cloudstorage_uses_vpc_service_controls` check to handle VPC Service Controls blocked API access [(#9478)](https://github.com/prowler-cloud/prowler/pull/9478)
---
## [v5.14.3] (Prowler UNRELEASED)

View File

@@ -1,5 +1,6 @@
from typing import Optional
from googleapiclient.errors import HttpError
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
@@ -12,6 +13,7 @@ class CloudStorage(GCPService):
def __init__(self, provider: GcpProvider):
super().__init__("storage", provider)
self.buckets = []
self.vpc_service_controls_protected_projects = set()
self._get_buckets()
def _get_buckets(self):
@@ -93,6 +95,17 @@ class CloudStorage(GCPService):
request = self.client.buckets().list_next(
previous_request=request, previous_response=response
)
except HttpError as http_error:
# Check if the error is due to VPC Service Controls blocking the API
if "vpcServiceControlsUniqueIdentifier" in str(http_error):
self.vpc_service_controls_protected_projects.add(project_id)
logger.warning(
f"Project {project_id} is protected by VPC Service Controls for Cloud Storage API."
)
else:
logger.error(
f"{http_error.__class__.__name__}[{http_error.__traceback__.tb_lineno}]: {http_error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -5,14 +5,22 @@ from prowler.providers.gcp.services.accesscontextmanager.accesscontextmanager_cl
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_client import (
cloudresourcemanager_client,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_client import (
cloudstorage_client,
)
class cloudstorage_uses_vpc_service_controls(Check):
"""
Ensure Cloud Storage is protected by VPC Service Controls at project level.
Reports PASS if a project is in a VPC Service Controls perimeter
with storage.googleapis.com as a restricted service, otherwise FAIL.
Reports PASS if:
- A project is in a VPC Service Controls perimeter with storage.googleapis.com
as a restricted service, OR
- The Cloud Storage API access is blocked by VPC Service Controls
(verified by vpcServiceControlsUniqueIdentifier in the error response)
Otherwise reports FAIL.
"""
def execute(self) -> list[Check_Report_GCP]:
@@ -47,6 +55,12 @@ class cloudstorage_uses_vpc_service_controls(Check):
if project_resource_id in protected_projects:
report.status = "PASS"
report.status_extended = f"Project {project.id} has VPC Service Controls enabled for Cloud Storage in perimeter {protected_projects[project_resource_id]}."
elif (
project.id
in cloudstorage_client.vpc_service_controls_protected_projects
):
report.status = "PASS"
report.status_extended = f"Project {project.id} has VPC Service Controls enabled for Cloud Storage in undetermined perimeter (verified by API access restriction)."
findings.append(report)

View File

@@ -1,4 +1,6 @@
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from googleapiclient.errors import HttpError
from prowler.providers.gcp.services.cloudstorage.cloudstorage_service import (
CloudStorage,
@@ -56,3 +58,36 @@ class TestCloudStorageService:
assert not cloudstorage_client.buckets[1].public
assert cloudstorage_client.buckets[1].retention_policy is None
assert cloudstorage_client.buckets[1].project_id == GCP_PROJECT_ID
def test_vpc_service_controls_blocked(self):
with (
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__",
new=mock_is_api_active,
),
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__generate_client__",
) as mock_client,
):
mock_resp = MagicMock()
mock_resp.status = 403
mock_resp.reason = "Forbidden"
vpc_error = HttpError(
resp=mock_resp,
content=b'{"error": {"message": "Request is prohibited by organization\'s policy. vpcServiceControlsUniqueIdentifier: 12345"}}',
)
mock_buckets = MagicMock()
mock_buckets.list.return_value.execute.side_effect = vpc_error
mock_client.return_value.buckets.return_value = mock_buckets
cloudstorage_client = CloudStorage(
set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID])
)
assert (
GCP_PROJECT_ID
in cloudstorage_client.vpc_service_controls_protected_projects
)
assert len(cloudstorage_client.buckets) == 0

View File

@@ -315,3 +315,70 @@ class TestCloudStorageUsesVPCServiceControls:
result = check.execute()
assert len(result) == 0
def test_project_protected_by_vpc_sc_api_blocked(self):
cloudresourcemanager_client = mock.MagicMock()
accesscontextmanager_client = mock.MagicMock()
cloudstorage_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage.cloudstorage_uses_vpc_service_controls.cloudstorage_uses_vpc_service_controls.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage.cloudstorage_uses_vpc_service_controls.cloudstorage_uses_vpc_service_controls.accesscontextmanager_client",
new=accesscontextmanager_client,
),
mock.patch(
"prowler.providers.gcp.services.cloudstorage.cloudstorage_uses_vpc_service_controls.cloudstorage_uses_vpc_service_controls.cloudstorage_client",
new=cloudstorage_client,
),
):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Project,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_uses_vpc_service_controls.cloudstorage_uses_vpc_service_controls import (
cloudstorage_uses_vpc_service_controls,
)
project1 = Project(
id=GCP_PROJECT_ID, number="123456789012", audit_logging=True
)
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
cloudresourcemanager_client.region = GCP_US_CENTER1_LOCATION
# No service perimeters configured, but API access is blocked by VPC SC
accesscontextmanager_client.service_perimeters = []
cloudstorage_client.vpc_service_controls_protected_projects = {
GCP_PROJECT_ID
}
check = cloudstorage_uses_vpc_service_controls()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Project {GCP_PROJECT_ID} has VPC Service Controls enabled for Cloud Storage in undetermined perimeter (verified by API access restriction)."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == "test-project"
assert result[0].location == GCP_US_CENTER1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID