feat(gcp): add check to ensure Managed Instance Groups span multiple zones (#9566)

Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
This commit is contained in:
lydiavilchez
2025-12-22 15:12:08 +01:00
committed by GitHub
parent 641dc78c3a
commit 43875b6ae7
14 changed files with 710 additions and 1 deletions

View File

@@ -329,7 +329,11 @@ config_azure = {
"defender_attack_path_minimal_risk_level": "High",
}
config_gcp = {"shodan_api_key": None, "max_unused_account_days": 30}
config_gcp = {
"shodan_api_key": None,
"mig_min_zones": 2,
"max_unused_account_days": 30,
}
config_kubernetes = {
"audit_log_maxbackup": 10,

View File

@@ -410,6 +410,9 @@ gcp:
# GCP Compute Configuration
# gcp.compute_public_address_shodan
shodan_api_key: null
# gcp.compute_instance_group_multiple_zones
# Minimum number of zones a MIG should span for high availability
mig_min_zones: 2
max_unused_account_days: 30
# Kubernetes Configuration

View File

@@ -57,6 +57,7 @@ def mock_api_client(GCPService, service, api_version, _):
mock_api_sink_calls(client)
mock_api_services_calls(client)
mock_api_access_policies_calls(client)
mock_api_instance_group_managers_calls(client)
return client
@@ -1184,3 +1185,59 @@ def mock_api_access_policies_calls(client: MagicMock):
client.accessPolicies().servicePerimeters().list = mock_list_service_perimeters
client.accessPolicies().servicePerimeters().list_next.return_value = None
def mock_api_instance_group_managers_calls(client: MagicMock):
"""Mock API calls for Managed Instance Groups (both regional and zonal)."""
regional_mig1_id = str(uuid4())
regional_mig2_id = str(uuid4())
zonal_mig1_id = str(uuid4())
# Mock regional instance group managers
client.regionInstanceGroupManagers().list().execute.return_value = {
"items": [
{
"name": "regional-mig-1",
"id": regional_mig1_id,
"targetSize": 3,
"distributionPolicy": {
"zones": [
{
"zone": "https://www.googleapis.com/compute/v1/projects/test-project/zones/europe-west1-b"
},
{
"zone": "https://www.googleapis.com/compute/v1/projects/test-project/zones/europe-west1-c"
},
{
"zone": "https://www.googleapis.com/compute/v1/projects/test-project/zones/europe-west1-d"
},
]
},
},
{
"name": "regional-mig-single-zone",
"id": regional_mig2_id,
"targetSize": 1,
"distributionPolicy": {
"zones": [
{
"zone": "https://www.googleapis.com/compute/v1/projects/test-project/zones/europe-west1-b"
}
]
},
},
]
}
client.regionInstanceGroupManagers().list_next.return_value = None
# Mock zonal instance group managers
client.instanceGroupManagers().list().execute.return_value = {
"items": [
{
"name": "zonal-mig-1",
"id": zonal_mig1_id,
"targetSize": 2,
},
]
}
client.instanceGroupManagers().list_next.return_value = None

View File

@@ -91,6 +91,7 @@ class TestGCPProvider:
"shodan_api_key": None,
"max_unused_account_days": 180,
"storage_min_retention_days": 90,
"mig_min_zones": 2,
}
@freeze_time(datetime.today())

View File

@@ -0,0 +1,389 @@
from re import search
from unittest import mock
from prowler.providers.gcp.models import GCPProject
from tests.providers.gcp.gcp_fixtures import GCP_PROJECT_ID, set_mocked_gcp_provider
class Test_compute_instance_group_multiple_zones:
"""Tests for the compute_instance_group_multiple_zones check."""
def test_no_instance_groups(self):
"""Test when there are no managed instance groups."""
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instance_groups = []
compute_client.audit_config = {"mig_min_zones": 2}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones import (
compute_instance_group_multiple_zones,
)
check = compute_instance_group_multiple_zones()
result = check.execute()
assert len(result) == 0
def test_regional_mig_multiple_zones_pass(self):
"""Test a regional MIG spanning multiple zones - should PASS."""
from prowler.providers.gcp.services.compute.compute_service import (
ManagedInstanceGroup,
)
mig = ManagedInstanceGroup(
name="regional-mig-1",
id="123456789",
region="us-central1",
zone=None,
zones=["us-central1-a", "us-central1-b", "us-central1-c"],
is_regional=True,
target_size=3,
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instance_groups = [mig]
compute_client.audit_config = {"mig_min_zones": 2}
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "us-central1"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones import (
compute_instance_group_multiple_zones,
)
check = compute_instance_group_multiple_zones()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Managed Instance Group {mig.name} is a regional MIG spanning 3 zones",
result[0].status_extended,
)
assert result[0].resource_id == mig.id
assert result[0].resource_name == mig.name
assert result[0].location == mig.region
assert result[0].project_id == GCP_PROJECT_ID
def test_zonal_mig_single_zone_fail(self):
"""Test a zonal MIG in a single zone - should FAIL."""
from prowler.providers.gcp.services.compute.compute_service import (
ManagedInstanceGroup,
)
mig = ManagedInstanceGroup(
name="zonal-mig-1",
id="987654321",
region="us-central1",
zone="us-central1-a",
zones=["us-central1-a"],
is_regional=False,
target_size=2,
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instance_groups = [mig]
compute_client.audit_config = {"mig_min_zones": 2}
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "us-central1"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones import (
compute_instance_group_multiple_zones,
)
check = compute_instance_group_multiple_zones()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Managed Instance Group {mig.name} is a zonal MIG running only in",
result[0].status_extended,
)
assert result[0].resource_id == mig.id
assert result[0].resource_name == mig.name
assert result[0].location == mig.region
assert result[0].project_id == GCP_PROJECT_ID
def test_regional_mig_single_zone_fail(self):
"""Test a regional MIG with only one zone configured - should FAIL."""
from prowler.providers.gcp.services.compute.compute_service import (
ManagedInstanceGroup,
)
mig = ManagedInstanceGroup(
name="regional-mig-single-zone",
id="111222333",
region="europe-west1",
zone=None,
zones=["europe-west1-b"],
is_regional=True,
target_size=1,
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instance_groups = [mig]
compute_client.audit_config = {"mig_min_zones": 2}
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "europe-west1"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones import (
compute_instance_group_multiple_zones,
)
check = compute_instance_group_multiple_zones()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Managed Instance Group {mig.name} is a regional MIG but only spans 1 zone",
result[0].status_extended,
)
assert result[0].resource_id == mig.id
assert result[0].resource_name == mig.name
assert result[0].location == mig.region
assert result[0].project_id == GCP_PROJECT_ID
def test_multiple_migs_mixed_results(self):
"""Test multiple MIGs with mixed compliance results."""
from prowler.providers.gcp.services.compute.compute_service import (
ManagedInstanceGroup,
)
mig_regional_pass = ManagedInstanceGroup(
name="regional-mig-good",
id="111",
region="us-central1",
zone=None,
zones=["us-central1-a", "us-central1-b"],
is_regional=True,
target_size=2,
project_id=GCP_PROJECT_ID,
)
mig_zonal_fail = ManagedInstanceGroup(
name="zonal-mig-bad",
id="222",
region="us-central1",
zone="us-central1-a",
zones=["us-central1-a"],
is_regional=False,
target_size=1,
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instance_groups = [mig_regional_pass, mig_zonal_fail]
compute_client.audit_config = {"mig_min_zones": 2}
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "us-central1"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones import (
compute_instance_group_multiple_zones,
)
check = compute_instance_group_multiple_zones()
result = check.execute()
assert len(result) == 2
# First MIG (regional with 2 zones) should pass
assert result[0].status == "PASS"
assert result[0].resource_id == mig_regional_pass.id
# Second MIG (zonal with 1 zone) should fail
assert result[1].status == "FAIL"
assert result[1].resource_id == mig_zonal_fail.id
def test_custom_min_zones_config(self):
"""Test that the configurable min zones parameter is respected."""
from prowler.providers.gcp.services.compute.compute_service import (
ManagedInstanceGroup,
)
# MIG with 2 zones - should fail if min_zones is 3
mig = ManagedInstanceGroup(
name="regional-mig-2zones",
id="333",
region="us-central1",
zone=None,
zones=["us-central1-a", "us-central1-b"],
is_regional=True,
target_size=2,
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instance_groups = [mig]
compute_client.audit_config = {"mig_min_zones": 3} # Require 3 zones
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "us-central1"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones import (
compute_instance_group_multiple_zones,
)
check = compute_instance_group_multiple_zones()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("minimum required is 3", result[0].status_extended)
def test_default_min_zones_when_not_configured(self):
"""Test that default min_zones (2) is used when not configured."""
from prowler.providers.gcp.services.compute.compute_service import (
ManagedInstanceGroup,
)
mig = ManagedInstanceGroup(
name="regional-mig-default",
id="444",
region="us-central1",
zone=None,
zones=["us-central1-a", "us-central1-b"],
is_regional=True,
target_size=2,
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instance_groups = [mig]
compute_client.audit_config = {} # No mig_min_zones configured
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="test-project",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "us-central1"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_group_multiple_zones.compute_instance_group_multiple_zones import (
compute_instance_group_multiple_zones,
)
check = compute_instance_group_multiple_zones()
result = check.execute()
assert len(result) == 1
# 2 zones >= default 2, so should PASS
assert result[0].status == "PASS"

View File

@@ -186,3 +186,68 @@ class TestComputeService:
assert compute_client.load_balancers[3].service == "regional_service2"
assert compute_client.load_balancers[3].project_id == GCP_PROJECT_ID
assert not compute_client.load_balancers[3].logging
# Test Managed Instance Groups
# We expect 3 MIGs: 2 regional (from region europe-west1-b) and 1 zonal (from zone1)
assert len(compute_client.instance_groups) == 3
# First regional MIG - multiple zones
regional_mig_1 = next(
(
mig
for mig in compute_client.instance_groups
if mig.name == "regional-mig-1"
),
None,
)
assert regional_mig_1 is not None
assert regional_mig_1.id.__class__.__name__ == "str"
assert regional_mig_1.region == "europe-west1-b"
assert regional_mig_1.zone is None # Regional MIGs don't have a single zone
assert len(regional_mig_1.zones) == 3
assert "europe-west1-b" in regional_mig_1.zones
assert "europe-west1-c" in regional_mig_1.zones
assert "europe-west1-d" in regional_mig_1.zones
assert regional_mig_1.is_regional
assert regional_mig_1.target_size == 3
assert regional_mig_1.project_id == GCP_PROJECT_ID
# Second regional MIG - single zone
regional_mig_2 = next(
(
mig
for mig in compute_client.instance_groups
if mig.name == "regional-mig-single-zone"
),
None,
)
assert regional_mig_2 is not None
assert regional_mig_2.id.__class__.__name__ == "str"
assert regional_mig_2.region == "europe-west1-b"
assert regional_mig_2.zone is None
assert len(regional_mig_2.zones) == 1
assert "europe-west1-b" in regional_mig_2.zones
assert regional_mig_2.is_regional
assert regional_mig_2.target_size == 1
assert regional_mig_2.project_id == GCP_PROJECT_ID
# Zonal MIG
zonal_mig = next(
(
mig
for mig in compute_client.instance_groups
if mig.name == "zonal-mig-1"
),
None,
)
assert zonal_mig is not None
assert zonal_mig.id.__class__.__name__ == "str"
assert (
zonal_mig.region == "zone1"
) # zone1 has no hyphen so region is "zone1"
assert zonal_mig.zone == "zone1"
assert len(zonal_mig.zones) == 1
assert "zone1" in zonal_mig.zones
assert not zonal_mig.is_regional
assert zonal_mig.target_size == 2
assert zonal_mig.project_id == GCP_PROJECT_ID