From 9dc4deccb6aaa1ef02550fe9acbe474241665fa3 Mon Sep 17 00:00:00 2001 From: s1ns3nz0 Date: Tue, 19 May 2026 22:52:16 +0900 Subject: [PATCH] feat(gcp): add cloudsql_instance_cmek_encryption_enabled check (#11023) Co-authored-by: Daniel Barranquero --- prowler/CHANGELOG.md | 8 + .../__init__.py | 0 ...ance_cmek_encryption_enabled.metadata.json | 42 +++ ...oudsql_instance_cmek_encryption_enabled.py | 25 ++ .../gcp/services/cloudsql/cloudsql_service.py | 36 ++- tests/providers/gcp/gcp_fixtures.py | 6 +- ...l_instance_cmek_encryption_enabled_test.py | 277 ++++++++++++++++++ .../cloudsql/cloudsql_service_test.py | 13 +- 8 files changed, 390 insertions(+), 17 deletions(-) create mode 100644 prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/__init__.py create mode 100644 prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled.metadata.json create mode 100644 prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled.py create mode 100644 tests/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled_test.py diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 2185d970ad..883222e26a 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to the **Prowler SDK** are documented in this file. +## [5.28.0] (Prowler UNRELEASED) + +### 🚀 Added + +- `cloudsql_instance_cmek_encryption_enabled` check for GCP provider [(#11023)](https://github.com/prowler-cloud/prowler/pull/11023) + +--- + ## [5.27.0] (Prowler v5.27.0) ### 🚀 Added diff --git a/prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/__init__.py b/prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled.metadata.json b/prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled.metadata.json new file mode 100644 index 0000000000..5ec026f9ed --- /dev/null +++ b/prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled.metadata.json @@ -0,0 +1,42 @@ +{ + "Provider": "gcp", + "CheckID": "cloudsql_instance_cmek_encryption_enabled", + "CheckTitle": "Cloud SQL instance is encrypted with a customer-managed key (CMEK)", + "CheckType": [], + "ServiceName": "cloudsql", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "sqladmin.googleapis.com/Instance", + "Description": "**Cloud SQL instances** use **customer-managed encryption keys** (`CMEK`) via Cloud KMS for at-rest encryption. The evaluation identifies instances lacking a configured **Cloud KMS key**, indicating use of default Google-managed encryption instead.", + "Risk": "Without CMEK, Google holds sole control of the encryption keys. If the organization must demonstrate key custody, meet data residency requirements, or immediately revoke access to data (e.g., upon contract termination), Google-managed keys are insufficient. This may violate ISMS-P 2.7.1 and regulatory requirements for sensitive or personal data.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://cloud.google.com/sql/docs/mysql/cmek", + "https://cloud.google.com/sql/docs/postgres/cmek", + "https://cloud.google.com/sql/docs/sqlserver/cmek", + "https://cloud.google.com/kms/docs/resource-hierarchy" + ], + "Remediation": { + "Code": { + "CLI": "gcloud sql instances create \\\n --database-version= \\\n --region= \\\n --disk-encryption-key=projects//locations//keyRings//cryptoKeys/", + "NativeIaC": "", + "Other": "CMEK must be configured at instance creation time. To migrate an existing instance:\n1. Create a new Cloud SQL instance with CMEK enabled.\n2. Export data from the existing instance.\n3. Import data into the new CMEK-enabled instance.\n4. Update application connection strings.", + "Terraform": "```hcl\nresource \"google_sql_database_instance\" \"example\" {\n name = \"\"\n database_version = \"\"\n region = \"\"\n\n encryption_key_name = \"projects//locations//keyRings//cryptoKeys/\"\n\n settings {\n tier = \"db-custom-2-7680\"\n }\n}\n```" + }, + "Recommendation": { + "Text": "For instances storing personal or sensitive data, create new Cloud SQL instances with CMEK using a Cloud KMS key in the same region. Ensure the Cloud SQL service account has the roles/cloudkms.cryptoKeyEncrypterDecrypter role on the key, and enable key rotation per your policy.", + "Url": "https://hub.prowler.com/check/cloudsql_instance_cmek_encryption_enabled" + } + }, + "Categories": [ + "encryption" + ], + "DependsOn": [], + "RelatedTo": [ + "kms_key_rotation_enabled", + "kms_key_not_publicly_accessible", + "bigquery_dataset_cmk_encryption" + ], + "Notes": "CMEK cannot be enabled on an existing Cloud SQL instance; it must be set at creation time. Existing instances require data migration to a new CMEK-enabled instance." +} diff --git a/prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled.py b/prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled.py new file mode 100644 index 0000000000..8048ced453 --- /dev/null +++ b/prowler/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled.py @@ -0,0 +1,25 @@ +from prowler.lib.check.models import Check, Check_Report_GCP +from prowler.providers.gcp.services.cloudsql.cloudsql_client import cloudsql_client + + +class cloudsql_instance_cmek_encryption_enabled(Check): + def execute(self) -> Check_Report_GCP: + findings = [] + for instance in cloudsql_client.instances: + if instance.instance_type != "CLOUD_SQL_INSTANCE": + continue + report = Check_Report_GCP(metadata=self.metadata(), resource=instance) + if instance.cmek_key_name: + report.status = "PASS" + report.status_extended = ( + f"Database instance {instance.name} is encrypted with " + f"customer-managed key: {instance.cmek_key_name}." + ) + else: + report.status = "FAIL" + report.status_extended = ( + f"Database instance {instance.name} is not encrypted with a " + f"customer-managed key (CMEK); Google-managed key is in use." + ) + findings.append(report) + return findings diff --git a/prowler/providers/gcp/services/cloudsql/cloudsql_service.py b/prowler/providers/gcp/services/cloudsql/cloudsql_service.py index 2d04a4248c..137169999a 100644 --- a/prowler/providers/gcp/services/cloudsql/cloudsql_service.py +++ b/prowler/providers/gcp/services/cloudsql/cloudsql_service.py @@ -1,3 +1,5 @@ +from typing import Optional + from pydantic.v1 import BaseModel from prowler.lib.logger import logger @@ -24,6 +26,8 @@ class CloudSQL(GCPService): for address in instance.get("ipAddresses", []): if address["type"] == "PRIMARY": public_ip = True + settings = instance.get("settings", {}) + ip_config = settings.get("ipConfiguration", {}) self.instances.append( Instance( name=instance["name"], @@ -31,19 +35,23 @@ class CloudSQL(GCPService): region=instance["region"], ip_addresses=instance.get("ipAddresses", []), public_ip=public_ip, - require_ssl=instance["settings"] - .get("ipConfiguration", {}) - .get("requireSsl", False), - ssl_mode=instance["settings"] - .get("ipConfiguration", {}) - .get("sslMode", "ALLOW_UNENCRYPTED_AND_ENCRYPTED"), - automated_backups=instance["settings"] - .get("backupConfiguration", {}) - .get("enabled", False), - authorized_networks=instance["settings"] - .get("ipConfiguration", {}) - .get("authorizedNetworks", []), - flags=instance["settings"].get("databaseFlags", []), + require_ssl=ip_config.get("requireSsl", False), + ssl_mode=ip_config.get( + "sslMode", "ALLOW_UNENCRYPTED_AND_ENCRYPTED" + ), + automated_backups=settings.get( + "backupConfiguration", {} + ).get("enabled", False), + authorized_networks=ip_config.get( + "authorizedNetworks", [] + ), + flags=settings.get("databaseFlags", []), + instance_type=instance.get( + "instanceType", "CLOUD_SQL_INSTANCE" + ), + cmek_key_name=instance.get( + "diskEncryptionConfiguration", {} + ).get("kmsKeyName"), project_id=project_id, ) ) @@ -68,4 +76,6 @@ class Instance(BaseModel): ssl_mode: str automated_backups: bool flags: list + instance_type: str = "CLOUD_SQL_INSTANCE" + cmek_key_name: Optional[str] = None project_id: str diff --git a/tests/providers/gcp/gcp_fixtures.py b/tests/providers/gcp/gcp_fixtures.py index 99eb88d25b..f46d1f82db 100644 --- a/tests/providers/gcp/gcp_fixtures.py +++ b/tests/providers/gcp/gcp_fixtures.py @@ -41,7 +41,7 @@ def set_mocked_gcp_provider( return provider -def mock_api_client(GCPService, service, api_version, _): +def mock_api_client(_GCPService, service, _api_version, _): client = MagicMock() mock_api_projects_calls(client) @@ -703,6 +703,9 @@ def mock_api_instances_calls(client: MagicMock, service: str): "databaseVersion": "MYSQL_5_7", "region": "us-central1", "ipAddresses": [{"type": "PRIMARY", "ipAddress": "66.66.66.66"}], + "diskEncryptionConfiguration": { + "kmsKeyName": "projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1" + }, "settings": { "ipConfiguration": { "requireSsl": True, @@ -1323,6 +1326,7 @@ def mock_api_images_calls(client: MagicMock): client.images().list_next.return_value = None def mock_get_image_iam_policy(project, resource): + del project return_value = MagicMock() if resource == "test-image-1": return_value.execute.return_value = { diff --git a/tests/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled_test.py b/tests/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled_test.py new file mode 100644 index 0000000000..e2e6e3f9e4 --- /dev/null +++ b/tests/providers/gcp/services/cloudsql/cloudsql_instance_cmek_encryption_enabled/cloudsql_instance_cmek_encryption_enabled_test.py @@ -0,0 +1,277 @@ +from unittest import mock +from unittest.mock import MagicMock, patch + +from tests.providers.gcp.gcp_fixtures import ( + GCP_EU1_LOCATION, + GCP_PROJECT_ID, + mock_is_api_active, + set_mocked_gcp_provider, +) + + +class Test_cloudsql_instance_cmek_encryption_enabled: + def test_no_instances(self): + cloudsql_client = mock.MagicMock() + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + "prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled.cloudsql_client", + new=cloudsql_client, + ), + ): + from prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled import ( + cloudsql_instance_cmek_encryption_enabled, + ) + + cloudsql_client.instances = [] + check = cloudsql_instance_cmek_encryption_enabled() + result = check.execute() + assert len(result) == 0 + + def test_instance_cmek_enabled(self): + cloudsql_client = mock.MagicMock() + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + "prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled.cloudsql_client", + new=cloudsql_client, + ), + ): + from prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled import ( + cloudsql_instance_cmek_encryption_enabled, + ) + from prowler.providers.gcp.services.cloudsql.cloudsql_service import ( + Instance, + ) + + cloudsql_client.instances = [ + Instance( + name="db-cmek", + version="POSTGRES_15", + ip_addresses=[], + region=GCP_EU1_LOCATION, + public_ip=False, + require_ssl=False, + ssl_mode="ENCRYPTED_ONLY", + automated_backups=True, + authorized_networks=[], + flags=[], + project_id=GCP_PROJECT_ID, + cmek_key_name="projects/123456789012/locations/europe-west1/keyRings/my-ring/cryptoKeys/my-key", + ) + ] + check = cloudsql_instance_cmek_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "db-cmek" + assert result[0].location == GCP_EU1_LOCATION + assert result[0].project_id == GCP_PROJECT_ID + + def test_instance_cmek_not_configured(self): + cloudsql_client = mock.MagicMock() + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + "prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled.cloudsql_client", + new=cloudsql_client, + ), + ): + from prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled import ( + cloudsql_instance_cmek_encryption_enabled, + ) + from prowler.providers.gcp.services.cloudsql.cloudsql_service import ( + Instance, + ) + + cloudsql_client.instances = [ + Instance( + name="db-google-managed", + version="POSTGRES_15", + ip_addresses=[], + region=GCP_EU1_LOCATION, + public_ip=False, + require_ssl=False, + ssl_mode="ENCRYPTED_ONLY", + automated_backups=True, + authorized_networks=[], + flags=[], + project_id=GCP_PROJECT_ID, + cmek_key_name=None, + ) + ] + check = cloudsql_instance_cmek_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "db-google-managed" + assert result[0].location == GCP_EU1_LOCATION + assert result[0].project_id == GCP_PROJECT_ID + + def test_instance_cmek_empty_string(self): + cloudsql_client = mock.MagicMock() + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + "prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled.cloudsql_client", + new=cloudsql_client, + ), + ): + from prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled import ( + cloudsql_instance_cmek_encryption_enabled, + ) + from prowler.providers.gcp.services.cloudsql.cloudsql_service import ( + Instance, + ) + + cloudsql_client.instances = [ + Instance( + name="db-empty-key", + version="POSTGRES_15", + ip_addresses=[], + region=GCP_EU1_LOCATION, + public_ip=False, + require_ssl=False, + ssl_mode="ENCRYPTED_ONLY", + automated_backups=True, + authorized_networks=[], + flags=[], + project_id=GCP_PROJECT_ID, + cmek_key_name="", + ) + ] + check = cloudsql_instance_cmek_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "db-empty-key" + + def test_unsupported_instance_type_skipped(self): + cloudsql_client = mock.MagicMock() + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + "prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled.cloudsql_client", + new=cloudsql_client, + ), + ): + from prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled import ( + cloudsql_instance_cmek_encryption_enabled, + ) + from prowler.providers.gcp.services.cloudsql.cloudsql_service import ( + Instance, + ) + + cloudsql_client.instances = [ + Instance( + name="external-primary", + version="MYSQL_8_0", + ip_addresses=[], + region=GCP_EU1_LOCATION, + public_ip=False, + require_ssl=False, + ssl_mode="ENCRYPTED_ONLY", + automated_backups=False, + authorized_networks=[], + flags=[], + project_id=GCP_PROJECT_ID, + instance_type="ON_PREMISES_INSTANCE", + cmek_key_name=None, + ), + Instance( + name="db-cmek", + version="POSTGRES_15", + ip_addresses=[], + region=GCP_EU1_LOCATION, + public_ip=False, + require_ssl=False, + ssl_mode="ENCRYPTED_ONLY", + automated_backups=True, + authorized_networks=[], + flags=[], + project_id=GCP_PROJECT_ID, + instance_type="CLOUD_SQL_INSTANCE", + cmek_key_name="projects/p/locations/europe-west1/keyRings/r/cryptoKeys/k", + ), + ] + check = cloudsql_instance_cmek_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].resource_id == "db-cmek" + assert result[0].status == "PASS" + + def test_service_parser_missing_disk_encryption(self): + """Exercise the real service parser path when diskEncryptionConfiguration is absent.""" + + def mock_api_client_without_disk_encryption(*_args, **_kwargs): + client = MagicMock() + client.instances().list().execute.return_value = { + "items": [ + { + "name": "db-no-encryption-config", + "databaseVersion": "POSTGRES_14", + "region": "us-central1", + "ipAddresses": [], + "settings": { + "ipConfiguration": {"requireSsl": True}, + "backupConfiguration": {"enabled": True}, + "databaseFlags": [], + }, + } + ] + } + client.instances().list_next.return_value = None + return client + + with ( + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__", + new=mock_is_api_active, + ), + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__generate_client__", + new=mock_api_client_without_disk_encryption, + ), + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID]), + ), + ): + from prowler.providers.gcp.services.cloudsql.cloudsql_service import ( + CloudSQL, + ) + + cloudsql_client = CloudSQL( + set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID]) + ) + assert len(cloudsql_client.instances) == 1 + assert cloudsql_client.instances[0].cmek_key_name is None + + with patch( + "prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled.cloudsql_client", + new=cloudsql_client, + ): + from prowler.providers.gcp.services.cloudsql.cloudsql_instance_cmek_encryption_enabled.cloudsql_instance_cmek_encryption_enabled import ( + cloudsql_instance_cmek_encryption_enabled, + ) + + check = cloudsql_instance_cmek_encryption_enabled() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "db-no-encryption-config" diff --git a/tests/providers/gcp/services/cloudsql/cloudsql_service_test.py b/tests/providers/gcp/services/cloudsql/cloudsql_service_test.py index fcc6473559..b5bb846e9e 100644 --- a/tests/providers/gcp/services/cloudsql/cloudsql_service_test.py +++ b/tests/providers/gcp/services/cloudsql/cloudsql_service_test.py @@ -43,6 +43,11 @@ class TestCloudSQLService: {"value": "test"} ] assert cloudsql_client.instances[0].flags == [] + assert cloudsql_client.instances[0].instance_type == "CLOUD_SQL_INSTANCE" + assert ( + cloudsql_client.instances[0].cmek_key_name + == "projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1" + ) assert cloudsql_client.instances[0].project_id == GCP_PROJECT_ID assert cloudsql_client.instances[1].name == "instance2" @@ -62,12 +67,14 @@ class TestCloudSQLService: {"value": "test"} ] assert cloudsql_client.instances[1].flags == [] + assert cloudsql_client.instances[1].instance_type == "CLOUD_SQL_INSTANCE" + assert cloudsql_client.instances[1].cmek_key_name is None assert cloudsql_client.instances[1].project_id == GCP_PROJECT_ID def test_instances_without_backup_configuration(self): """Test that CloudSQL service handles instances without backupConfiguration field""" - def mock_api_client_without_backup_config(*args, **kwargs): + def mock_api_client_without_backup_config(*_args, **_kwargs): from unittest.mock import MagicMock client = MagicMock() @@ -119,7 +126,7 @@ class TestCloudSQLService: def test_instances_with_empty_backup_configuration(self): """Test that CloudSQL service handles instances with empty backupConfiguration""" - def mock_api_client_with_empty_backup_config(*args, **kwargs): + def mock_api_client_with_empty_backup_config(*_args, **_kwargs): from unittest.mock import MagicMock client = MagicMock() @@ -170,7 +177,7 @@ class TestCloudSQLService: def test_instances_without_settings_fields(self): """Test that CloudSQL service handles instances with minimal settings""" - def mock_api_client_with_minimal_settings(*args, **kwargs): + def mock_api_client_with_minimal_settings(*_args, **_kwargs): from unittest.mock import MagicMock client = MagicMock()