From ccc1f161d24c10d2c55bad9b35c533870852d229 Mon Sep 17 00:00:00 2001 From: s1ns3nz0 Date: Mon, 22 Jun 2026 17:26:03 +0900 Subject: [PATCH] feat(gcp): add cloudfunction_function_not_publicly_accessible check (#11022) Co-authored-by: Lydia Vilchez --- prowler/CHANGELOG.md | 1 + .../cloudfunction_function_inside_vpc.py | 6 +- .../__init__.py | 0 ...tion_not_publicly_accessible.metadata.json | 40 +++ ...nction_function_not_publicly_accessible.py | 44 ++++ .../cloudfunction/cloudfunction_service.py | 62 +++++ .../cloudfunction_function_inside_vpc_test.py | 11 + .../__init__.py | 0 ...n_function_not_publicly_accessible_test.py | 216 +++++++++++++++++ .../cloudfunction_service_test.py | 229 +++++++++++++++++- 10 files changed, 602 insertions(+), 7 deletions(-) create mode 100644 prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/__init__.py create mode 100644 prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible.metadata.json create mode 100644 prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible.py create mode 100644 tests/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/__init__.py create mode 100644 tests/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible_test.py diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 29b2fd9ecc..bdb5f9abf0 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -12,6 +12,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - `sagemaker_clarify_exists` check for AWS provider [(#11211)](https://github.com/prowler-cloud/prowler/pull/11211) - `cloudsql_instance_high_availability_enabled` check for GCP provider, verifying Cloud SQL primary instances use `REGIONAL` availability for automatic zone failover [(#11024)](https://github.com/prowler-cloud/prowler/pull/11024) - `cloudfunction_function_inside_vpc` check for GCP provider, verifying Cloud Functions have a Serverless VPC Access connector for private egress [(#11021)](https://github.com/prowler-cloud/prowler/pull/11021) +- `cloudfunction_function_not_publicly_accessible` check for GCP provider, detecting Cloud Functions with `allUsers` or `allAuthenticatedUsers` IAM invocation bindings [(#11022)](https://github.com/prowler-cloud/prowler/pull/11022) - `identity_storage_service_level_admins_scoped` check for OCI provider CIS 3.1 control 1.15, ensuring storage service-level administrators exclude delete permissions [(#11523)](https://github.com/prowler-cloud/prowler/pull/11523) - `cosmosdb_account_automatic_failover_enabled` check for Azure provider [(#11031)](https://github.com/prowler-cloud/prowler/pull/11031) - `cosmosdb_account_backup_policy_continuous` check for Azure provider [(#11032)](https://github.com/prowler-cloud/prowler/pull/11032) diff --git a/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/cloudfunction_function_inside_vpc.py b/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/cloudfunction_function_inside_vpc.py index 46866b350d..3b28db7d8e 100644 --- a/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/cloudfunction_function_inside_vpc.py +++ b/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/cloudfunction_function_inside_vpc.py @@ -25,7 +25,11 @@ class cloudfunction_function_inside_vpc(Check): for function in cloudfunction_client.functions: if function.state != "ACTIVE": continue - report = Check_Report_GCP(metadata=self.metadata(), resource=function) + report = Check_Report_GCP( + metadata=self.metadata(), + resource=function, + resource_id=function.name, + ) if function.vpc_connector: report.status = "PASS" report.status_extended = ( diff --git a/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/__init__.py b/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible.metadata.json b/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible.metadata.json new file mode 100644 index 0000000000..8c9247b2bd --- /dev/null +++ b/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible.metadata.json @@ -0,0 +1,40 @@ +{ + "Provider": "gcp", + "CheckID": "cloudfunction_function_not_publicly_accessible", + "CheckTitle": "Cloud Function is not publicly invocable", + "CheckType": [], + "ServiceName": "cloudfunction", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "cloudfunctions.googleapis.com/Function", + "Description": "Cloud Functions deny invocation to `allUsers` and `allAuthenticatedUsers`, so only **explicitly authorized identities or services** can trigger them.\n\nThe evaluation reviews each function's IAM policy bindings to confirm no public principals are granted invoker access.", + "Risk": "Publicly invocable Cloud Functions expose **business logic** to the internet and let any caller trigger execution. This enables **unauthorized data access** when the function returns sensitive output, **code execution** in shared environments, and **denial-of-wallet** attacks driven by uncontrolled invocation costs.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://cloud.google.com/functions/docs/securing/authenticating", + "https://cloud.google.com/iam/docs/overview" + ], + "Remediation": { + "Code": { + "CLI": "gcloud functions remove-iam-policy-binding --region= --member= --role=roles/cloudfunctions.invoker", + "NativeIaC": "", + "Other": "1. In Google Cloud Console, go to Cloud Functions\n2. Select the function and open the Permissions tab\n3. Remove any binding with allUsers or allAuthenticatedUsers\n4. Grant invocation rights only to specific service accounts or user groups", + "Terraform": "```hcl\nresource \"google_cloudfunctions2_function_iam_binding\" \"\" {\n project = \"\"\n location = \"\"\n cloud_function = \"\"\n role = \"roles/cloudfunctions.invoker\"\n members = [\"serviceAccount:\"] # Critical: never include allUsers or allAuthenticatedUsers\n}\n```" + }, + "Recommendation": { + "Text": "Apply **least privilege** to Cloud Function invocation: grant `roles/cloudfunctions.invoker` only to specific service accounts or groups.\n\nFor externally exposed functions, front them with **API Gateway** or **Cloud Endpoints** that enforce authentication and rate limiting.", + "Url": "https://hub.prowler.com/check/cloudfunction_function_not_publicly_accessible" + } + }, + "Categories": [ + "internet-exposed" + ], + "DependsOn": [], + "RelatedTo": [ + "cloudfunction_function_inside_vpc", + "secretmanager_secret_not_publicly_accessible", + "cloudstorage_bucket_public_access" + ], + "Notes": "This check evaluates function-level IAM policies. Organization policy constraints/iam.allowedPolicyMemberDomains can prevent public bindings at the org level." +} diff --git a/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible.py b/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible.py new file mode 100644 index 0000000000..14ee874f22 --- /dev/null +++ b/prowler/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible.py @@ -0,0 +1,44 @@ +from prowler.lib.check.models import Check, Check_Report_GCP +from prowler.providers.gcp.services.cloudfunction.cloudfunction_client import ( + cloudfunction_client, +) + + +class cloudfunction_function_not_publicly_accessible(Check): + """Check that Cloud Functions do not grant invocation rights to all users. + + Verifies that no active Cloud Function has an IAM binding granting access + to `allUsers` or `allAuthenticatedUsers`. Non-`ACTIVE` functions are + skipped because their IAM bindings are transient. + """ + + def execute(self) -> list[Check_Report_GCP]: + """Execute the public-access check across all Cloud Functions. + + Returns: + A list of `Check_Report_GCP` findings, one per active Cloud + Function. Status is `FAIL` when the function is invokable by + `allUsers` or `allAuthenticatedUsers` and `PASS` otherwise. + """ + findings = [] + for function in cloudfunction_client.functions: + if function.state != "ACTIVE": + continue + report = Check_Report_GCP( + metadata=self.metadata(), + resource=function, + resource_id=function.name, + ) + if function.publicly_accessible: + report.status = "FAIL" + report.status_extended = ( + f"Cloud Function {function.name} is publicly invocable " + f"(allUsers or allAuthenticatedUsers IAM binding detected)." + ) + else: + report.status = "PASS" + report.status_extended = ( + f"Cloud Function {function.name} is not publicly accessible." + ) + findings.append(report) + return findings diff --git a/prowler/providers/gcp/services/cloudfunction/cloudfunction_service.py b/prowler/providers/gcp/services/cloudfunction/cloudfunction_service.py index d1043151b7..9905c98748 100644 --- a/prowler/providers/gcp/services/cloudfunction/cloudfunction_service.py +++ b/prowler/providers/gcp/services/cloudfunction/cloudfunction_service.py @@ -1,5 +1,6 @@ from typing import Optional +from googleapiclient import discovery from pydantic.v1 import BaseModel from prowler.lib.logger import logger @@ -20,7 +21,9 @@ class CloudFunction(GCPService): """Initialize the service and preload Cloud Functions.""" super().__init__("cloudfunctions", provider, api_version="v2") self.functions = [] + self._run_client = None self._get_functions() + self._get_functions_iam_policy() def _get_functions(self) -> None: """Fetch Cloud Functions for every project and location.""" @@ -47,10 +50,13 @@ class CloudFunction(GCPService): service_config = fn.get("serviceConfig", {}) self.functions.append( Function( + id=fn["name"], name=fn["name"].split("/")[-1], project_id=project_id, location=location_id, state=fn.get("state", "UNKNOWN"), + environment=fn.get("environment", "GEN_1"), + service=service_config.get("service"), vpc_connector=service_config.get( "vpcConnector" ), @@ -73,12 +79,68 @@ class CloudFunction(GCPService): f"{project_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _get_functions_iam_policy(self) -> None: + """Fetch IAM policy for every Cloud Function in parallel. + + For gen2 functions, IAM is delegated to the underlying Cloud Run + service, so a `run.googleapis.com` v2 client is required. + """ + if any(f.environment == "GEN_2" for f in self.functions): + self._run_client = discovery.build( + "run", + "v2", + credentials=self.credentials, + num_retries=DEFAULT_RETRY_ATTEMPTS, + ) + self.__threading_call__(self._get_function_iam_policy, self.functions) + + def _get_function_iam_policy(self, function: "Function") -> None: + """Mark a Cloud Function as publicly accessible when bound to `allUsers` or `allAuthenticatedUsers`. + + Cloud Functions gen2 delegates invocation IAM to its backing Cloud Run + service, so the binding is queried via the Run API. Gen1 functions are + queried through the Cloud Functions API directly. + """ + try: + if function.environment == "GEN_2" and function.service: + response = ( + self._run_client.projects() + .locations() + .services() + .getIamPolicy(resource=function.service) + .execute(num_retries=DEFAULT_RETRY_ATTEMPTS) + ) + else: + response = ( + self.client.projects() + .locations() + .functions() + .getIamPolicy(resource=function.id) + .execute( + http=self.__get_AuthorizedHttp_client__(), + num_retries=DEFAULT_RETRY_ATTEMPTS, + ) + ) + for binding in response.get("bindings", []): + members = binding.get("members", []) + if "allUsers" in members or "allAuthenticatedUsers" in members: + function.publicly_accessible = True + break + except Exception as error: + logger.error( + f"{function.location} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + class Function(BaseModel): """Cloud Function resource consumed by GCP checks.""" + id: str name: str project_id: str location: str state: str + environment: str = "GEN_1" + service: Optional[str] = None vpc_connector: Optional[str] = None + publicly_accessible: bool = False diff --git a/tests/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/cloudfunction_function_inside_vpc_test.py b/tests/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/cloudfunction_function_inside_vpc_test.py index b764760538..8428e56d86 100644 --- a/tests/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/cloudfunction_function_inside_vpc_test.py +++ b/tests/providers/gcp/services/cloudfunction/cloudfunction_function_inside_vpc/cloudfunction_function_inside_vpc_test.py @@ -13,6 +13,13 @@ _CHECK_PATH = ( _CLIENT_PATH = f"{_CHECK_PATH}.cloudfunction_client" +def _function_id(name: str) -> str: + return ( + f"projects/{GCP_PROJECT_ID}/locations/{GCP_US_CENTER1_LOCATION}" + f"/functions/{name}" + ) + + class Test_cloudfunction_function_inside_vpc: def test_no_functions(self): cloudfunction_client = mock.MagicMock() @@ -63,6 +70,7 @@ class Test_cloudfunction_function_inside_vpc: ) cloudfunction_client.functions = [ Function( + id=_function_id("fn-vpc"), name="fn-vpc", project_id=GCP_PROJECT_ID, location=GCP_US_CENTER1_LOCATION, @@ -106,6 +114,7 @@ class Test_cloudfunction_function_inside_vpc: cloudfunction_client.functions = [ Function( + id=_function_id("fn-public"), name="fn-public", project_id=GCP_PROJECT_ID, location=GCP_US_CENTER1_LOCATION, @@ -149,6 +158,7 @@ class Test_cloudfunction_function_inside_vpc: cloudfunction_client.functions = [ Function( + id=_function_id("fn-empty"), name="fn-empty", project_id=GCP_PROJECT_ID, location=GCP_US_CENTER1_LOCATION, @@ -184,6 +194,7 @@ class Test_cloudfunction_function_inside_vpc: cloudfunction_client.functions = [ Function( + id=_function_id("fn-deploy"), name="fn-deploy", project_id=GCP_PROJECT_ID, location=GCP_US_CENTER1_LOCATION, diff --git a/tests/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/__init__.py b/tests/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible_test.py b/tests/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible_test.py new file mode 100644 index 0000000000..615890bd58 --- /dev/null +++ b/tests/providers/gcp/services/cloudfunction/cloudfunction_function_not_publicly_accessible/cloudfunction_function_not_publicly_accessible_test.py @@ -0,0 +1,216 @@ +from unittest import mock + +from tests.providers.gcp.gcp_fixtures import ( + GCP_PROJECT_ID, + GCP_US_CENTER1_LOCATION, + set_mocked_gcp_provider, +) + +_CHECK_PATH = ( + "prowler.providers.gcp.services.cloudfunction." + "cloudfunction_function_not_publicly_accessible." + "cloudfunction_function_not_publicly_accessible" +) +_CLIENT_PATH = f"{_CHECK_PATH}.cloudfunction_client" + + +def _function_id(name: str) -> str: + return ( + f"projects/{GCP_PROJECT_ID}/locations/{GCP_US_CENTER1_LOCATION}" + f"/functions/{name}" + ) + + +class Test_cloudfunction_function_not_publicly_accessible: + def test_no_functions(self): + cloudfunction_client = mock.MagicMock() + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + _CLIENT_PATH, + new=cloudfunction_client, + ), + ): + from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_not_publicly_accessible.cloudfunction_function_not_publicly_accessible import ( + cloudfunction_function_not_publicly_accessible, + ) + + cloudfunction_client.functions = [] + + check = cloudfunction_function_not_publicly_accessible() + result = check.execute() + assert len(result) == 0 + + def test_function_private(self): + cloudfunction_client = mock.MagicMock() + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + _CLIENT_PATH, + new=cloudfunction_client, + ), + ): + from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_not_publicly_accessible.cloudfunction_function_not_publicly_accessible import ( + cloudfunction_function_not_publicly_accessible, + ) + from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import ( + Function, + ) + + cloudfunction_client.functions = [ + Function( + id=_function_id("fn-private"), + name="fn-private", + project_id=GCP_PROJECT_ID, + location=GCP_US_CENTER1_LOCATION, + state="ACTIVE", + publicly_accessible=False, + ) + ] + + check = cloudfunction_function_not_publicly_accessible() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Cloud Function fn-private is not publicly accessible." + ) + assert result[0].resource_id == "fn-private" + assert result[0].resource_name == "fn-private" + assert result[0].location == GCP_US_CENTER1_LOCATION + assert result[0].project_id == GCP_PROJECT_ID + + def test_function_publicly_accessible(self): + cloudfunction_client = mock.MagicMock() + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + _CLIENT_PATH, + new=cloudfunction_client, + ), + ): + from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_not_publicly_accessible.cloudfunction_function_not_publicly_accessible import ( + cloudfunction_function_not_publicly_accessible, + ) + from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import ( + Function, + ) + + cloudfunction_client.functions = [ + Function( + id=_function_id("fn-public"), + name="fn-public", + project_id=GCP_PROJECT_ID, + location=GCP_US_CENTER1_LOCATION, + state="ACTIVE", + publicly_accessible=True, + ) + ] + + check = cloudfunction_function_not_publicly_accessible() + result = check.execute() + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == ( + "Cloud Function fn-public is publicly invocable " + "(allUsers or allAuthenticatedUsers IAM binding detected)." + ) + assert result[0].resource_id == "fn-public" + assert result[0].resource_name == "fn-public" + assert result[0].location == GCP_US_CENTER1_LOCATION + assert result[0].project_id == GCP_PROJECT_ID + + def test_multiple_functions_mixed(self): + cloudfunction_client = mock.MagicMock() + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + _CLIENT_PATH, + new=cloudfunction_client, + ), + ): + from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_not_publicly_accessible.cloudfunction_function_not_publicly_accessible import ( + cloudfunction_function_not_publicly_accessible, + ) + from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import ( + Function, + ) + + cloudfunction_client.functions = [ + Function( + id=_function_id("fn-private"), + name="fn-private", + project_id=GCP_PROJECT_ID, + location=GCP_US_CENTER1_LOCATION, + state="ACTIVE", + publicly_accessible=False, + ), + Function( + id=_function_id("fn-public"), + name="fn-public", + project_id=GCP_PROJECT_ID, + location=GCP_US_CENTER1_LOCATION, + state="ACTIVE", + publicly_accessible=True, + ), + ] + + check = cloudfunction_function_not_publicly_accessible() + result = check.execute() + assert len(result) == 2 + + by_id = {r.resource_id: r for r in result} + assert by_id["fn-private"].status == "PASS" + assert by_id["fn-public"].status == "FAIL" + + def test_inactive_function_skipped(self): + cloudfunction_client = mock.MagicMock() + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_gcp_provider(), + ), + mock.patch( + _CLIENT_PATH, + new=cloudfunction_client, + ), + ): + from prowler.providers.gcp.services.cloudfunction.cloudfunction_function_not_publicly_accessible.cloudfunction_function_not_publicly_accessible import ( + cloudfunction_function_not_publicly_accessible, + ) + from prowler.providers.gcp.services.cloudfunction.cloudfunction_service import ( + Function, + ) + + cloudfunction_client.functions = [ + Function( + id=_function_id("fn-deleting"), + name="fn-deleting", + project_id=GCP_PROJECT_ID, + location=GCP_US_CENTER1_LOCATION, + state="DELETING", + publicly_accessible=True, + ) + ] + + check = cloudfunction_function_not_publicly_accessible() + result = check.execute() + assert len(result) == 0 diff --git a/tests/providers/gcp/services/cloudfunction/cloudfunction_service_test.py b/tests/providers/gcp/services/cloudfunction/cloudfunction_service_test.py index 82f77a939f..d97b80336b 100644 --- a/tests/providers/gcp/services/cloudfunction/cloudfunction_service_test.py +++ b/tests/providers/gcp/services/cloudfunction/cloudfunction_service_test.py @@ -11,12 +11,18 @@ from tests.providers.gcp.gcp_fixtures import ( _LOCATION_ID = "us-central1" _FUNCTION_NAME = "my-function" +_FUNCTION_ID = ( + f"projects/{GCP_PROJECT_ID}/locations/{_LOCATION_ID}/functions/{_FUNCTION_NAME}" +) +_RUN_SERVICE = ( + f"projects/{GCP_PROJECT_ID}/locations/{_LOCATION_ID}/services/{_FUNCTION_NAME}" +) _CONNECTOR = ( f"projects/{GCP_PROJECT_ID}/locations/{_LOCATION_ID}/connectors/my-connector" ) -def _make_cloudfunction_client(functions_list): +def _make_cloudfunction_client(functions_list, iam_bindings=None): """Return a mock GCP API client for the Cloud Functions v2 service.""" client = MagicMock() @@ -30,6 +36,29 @@ def _make_cloudfunction_client(functions_list): } client.projects().locations().functions().list_next.return_value = None + iam_response = {"bindings": iam_bindings or []} + + def mock_get_iam_policy(resource): + rv = MagicMock() + rv.execute.return_value = iam_response + return rv + + client.projects().locations().functions().getIamPolicy = mock_get_iam_policy + + return client + + +def _make_run_client(iam_bindings=None): + """Return a mock Cloud Run v2 client for gen2 IAM policy lookups.""" + client = MagicMock() + iam_response = {"bindings": iam_bindings or []} + + def mock_get_iam_policy(resource): + rv = MagicMock() + rv.execute.return_value = iam_response + return rv + + client.projects().locations().services().getIamPolicy = mock_get_iam_policy return client @@ -39,9 +68,11 @@ class TestCloudFunctionService: return _make_cloudfunction_client( functions_list=[ { - "name": f"projects/{GCP_PROJECT_ID}/locations/{_LOCATION_ID}/functions/{_FUNCTION_NAME}", + "name": _FUNCTION_ID, "state": "ACTIVE", + "environment": "GEN_2", "serviceConfig": { + "service": _RUN_SERVICE, "vpcConnector": _CONNECTOR, }, } @@ -57,6 +88,10 @@ class TestCloudFunctionService: "prowler.providers.gcp.lib.service.service.GCPService.__generate_client__", new=mock_api_client, ), + patch( + "prowler.providers.gcp.services.cloudfunction.cloudfunction_service.discovery.build", + return_value=_make_run_client(), + ), ): cf_client = CloudFunction( set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID]) @@ -64,11 +99,15 @@ class TestCloudFunctionService: assert len(cf_client.functions) == 1 fn = cf_client.functions[0] + assert fn.id == _FUNCTION_ID assert fn.name == _FUNCTION_NAME assert fn.project_id == GCP_PROJECT_ID assert fn.location == _LOCATION_ID assert fn.state == "ACTIVE" + assert fn.environment == "GEN_2" + assert fn.service == _RUN_SERVICE assert fn.vpc_connector == _CONNECTOR + assert fn.publicly_accessible is False def test_get_functions_without_vpc_connector(self): def mock_api_client(*args, **kwargs): @@ -77,11 +116,190 @@ class TestCloudFunctionService: { "name": f"projects/{GCP_PROJECT_ID}/locations/{_LOCATION_ID}/functions/no-vpc-func", "state": "ACTIVE", - "serviceConfig": {}, + "environment": "GEN_2", + "serviceConfig": { + "service": f"projects/{GCP_PROJECT_ID}/locations/{_LOCATION_ID}/services/no-vpc-func", + }, } ] ) + with ( + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__", + new=mock_is_api_active, + ), + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__generate_client__", + new=mock_api_client, + ), + patch( + "prowler.providers.gcp.services.cloudfunction.cloudfunction_service.discovery.build", + return_value=_make_run_client(), + ), + ): + cf_client = CloudFunction( + set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID]) + ) + + assert len(cf_client.functions) == 1 + fn = cf_client.functions[0] + assert fn.name == "no-vpc-func" + assert fn.vpc_connector is None + assert fn.publicly_accessible is False + + def test_get_functions_iam_policy_gen2_all_users(self): + """Gen2 functions: allUsers binding lives on the Cloud Run service.""" + + def mock_api_client(*args, **kwargs): + return _make_cloudfunction_client( + functions_list=[ + { + "name": _FUNCTION_ID, + "state": "ACTIVE", + "environment": "GEN_2", + "serviceConfig": {"service": _RUN_SERVICE}, + } + ] + ) + + run_client = _make_run_client( + iam_bindings=[ + { + "role": "roles/run.invoker", + "members": ["allUsers"], + } + ] + ) + + with ( + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__", + new=mock_is_api_active, + ), + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__generate_client__", + new=mock_api_client, + ), + patch( + "prowler.providers.gcp.services.cloudfunction.cloudfunction_service.discovery.build", + return_value=run_client, + ), + ): + cf_client = CloudFunction( + set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID]) + ) + + assert len(cf_client.functions) == 1 + assert cf_client.functions[0].publicly_accessible is True + + def test_get_functions_iam_policy_gen2_all_authenticated_users(self): + def mock_api_client(*args, **kwargs): + return _make_cloudfunction_client( + functions_list=[ + { + "name": _FUNCTION_ID, + "state": "ACTIVE", + "environment": "GEN_2", + "serviceConfig": {"service": _RUN_SERVICE}, + } + ] + ) + + run_client = _make_run_client( + iam_bindings=[ + { + "role": "roles/run.invoker", + "members": ["allAuthenticatedUsers"], + } + ] + ) + + with ( + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__", + new=mock_is_api_active, + ), + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__generate_client__", + new=mock_api_client, + ), + patch( + "prowler.providers.gcp.services.cloudfunction.cloudfunction_service.discovery.build", + return_value=run_client, + ), + ): + cf_client = CloudFunction( + set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID]) + ) + + assert len(cf_client.functions) == 1 + assert cf_client.functions[0].publicly_accessible is True + + def test_get_functions_iam_policy_gen2_not_public(self): + def mock_api_client(*args, **kwargs): + return _make_cloudfunction_client( + functions_list=[ + { + "name": _FUNCTION_ID, + "state": "ACTIVE", + "environment": "GEN_2", + "serviceConfig": {"service": _RUN_SERVICE}, + } + ] + ) + + run_client = _make_run_client( + iam_bindings=[ + { + "role": "roles/run.invoker", + "members": ["serviceAccount:sa@project.iam.gserviceaccount.com"], + } + ] + ) + + with ( + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__", + new=mock_is_api_active, + ), + patch( + "prowler.providers.gcp.lib.service.service.GCPService.__generate_client__", + new=mock_api_client, + ), + patch( + "prowler.providers.gcp.services.cloudfunction.cloudfunction_service.discovery.build", + return_value=run_client, + ), + ): + cf_client = CloudFunction( + set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID]) + ) + + assert len(cf_client.functions) == 1 + assert cf_client.functions[0].publicly_accessible is False + + def test_get_functions_iam_policy_gen1_all_users(self): + """Gen1 functions: IAM binding lives on the Cloud Functions resource itself.""" + + def mock_api_client(*args, **kwargs): + return _make_cloudfunction_client( + functions_list=[ + { + "name": _FUNCTION_ID, + "state": "ACTIVE", + "environment": "GEN_1", + "serviceConfig": {}, + } + ], + iam_bindings=[ + { + "role": "roles/cloudfunctions.invoker", + "members": ["allUsers"], + } + ], + ) + with ( patch( "prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__", @@ -97,6 +315,5 @@ class TestCloudFunctionService: ) assert len(cf_client.functions) == 1 - fn = cf_client.functions[0] - assert fn.name == "no-vpc-func" - assert fn.vpc_connector is None + assert cf_client.functions[0].environment == "GEN_1" + assert cf_client.functions[0].publicly_accessible is True