feat(gcp): add service account credentials (#6165)

This commit is contained in:
Pedro Martín
2024-12-16 16:11:32 +01:00
committed by GitHub
parent 36e61cb7a2
commit 396e51c27d
3 changed files with 96 additions and 28 deletions

View File

@@ -30,9 +30,9 @@ class GCPBaseException(ProwlerException):
"message": "Error testing connection to GCP",
"remediation": "Check the connection and ensure it is properly set up.",
},
(3006, "GCPLoadCredentialsFromDictError"): {
"message": "Error loading credentials from dictionary",
"remediation": "Check the credentials and ensure they are properly set up. client_id, client_secret and refresh_token are required.",
(3006, "GCPLoadADCFromDictError"): {
"message": "Error loading Application Default Credentials from dictionary",
"remediation": "Check the dictionary and ensure a valid Application Default Credentials are present with client_id, client_secret and refresh_token keys.",
},
(3007, "GCPStaticCredentialsError"): {
"message": "Error loading static credentials",
@@ -46,6 +46,10 @@ class GCPBaseException(ProwlerException):
"message": "Cloud Asset API not used",
"remediation": "Enable the Cloud Asset API for the project.",
},
(3010, "GCPLoadServiceAccountKeyFromDictError"): {
"message": "Error loading Service Account Private Key credentials from dictionary",
"remediation": "Check the dictionary and ensure it contains a Service Account Private Key.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
@@ -111,7 +115,7 @@ class GCPTestConnectionError(GCPBaseException):
)
class GCPLoadCredentialsFromDictError(GCPCredentialsError):
class GCPLoadADCFromDictError(GCPCredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
3006, file=file, original_exception=original_exception, message=message
@@ -137,3 +141,10 @@ class GCPCloudAssetAPINotUsedError(GCPBaseException):
super().__init__(
3009, file=file, original_exception=original_exception, message=message
)
class GCPLoadServiceAccountKeyFromDictError(GCPCredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
3010, file=file, original_exception=original_exception, message=message
)

View File

@@ -25,7 +25,8 @@ from prowler.providers.gcp.exceptions.exceptions import (
GCPGetProjectError,
GCPHTTPError,
GCPInvalidProviderIdError,
GCPLoadCredentialsFromDictError,
GCPLoadADCFromDictError,
GCPLoadServiceAccountKeyFromDictError,
GCPNoAccesibleProjectsError,
GCPSetUpSessionError,
GCPStaticCredentialsError,
@@ -57,7 +58,6 @@ class GcpProvider(Provider):
- get_projects -> Get the projects accessible by the provided credentials
- update_projects_with_organizations -> Update the projects with organizations
- is_project_matching -> Check if the input project matches the project to match
- validate_static_arguments -> Validate the static arguments
- validate_project_id -> Validate the provider ID
"""
@@ -87,6 +87,7 @@ class GcpProvider(Provider):
client_id: str = None,
client_secret: str = None,
refresh_token: str = None,
service_account_key: dict = None,
):
"""
GCP Provider constructor
@@ -106,11 +107,12 @@ class GcpProvider(Provider):
client_id: str
client_secret: str
refresh_token: str
service_account_key: dict
Raises:
GCPNoAccesibleProjectsError if no project IDs can be accessed via Google Credentials
GCPSetUpSessionError if an error occurs during the setup session
GCPLoadCredentialsFromDictError if an error occurs during the loading credentials from dict
GCPLoadADCFromDictError if an error occurs during the loading credentials from dict
GCPGetProjectError if an error occurs during the get project
Returns:
@@ -130,6 +132,10 @@ class GcpProvider(Provider):
... client_secret="client_secret",
... refresh_token="refresh_token"
... )
- Using the service account key:
>>> GcpProvider(
... service_account_key={"service_account_key": "service_account_key"}
... )
- Using a credentials file:
>>> GcpProvider(
... credentials_file="credentials_file"
@@ -167,7 +173,10 @@ class GcpProvider(Provider):
)
self._session, self._default_project_id = self.setup_session(
credentials_file, self._impersonated_service_account, gcp_credentials
credentials_file=credentials_file,
service_account=self._impersonated_service_account,
gcp_credentials=gcp_credentials,
service_account_key=service_account_key,
)
self._project_ids = []
@@ -312,25 +321,39 @@ class GcpProvider(Provider):
@staticmethod
def setup_session(
credentials_file: str, service_account: str, gcp_credentials: dict = None
credentials_file: str,
service_account: str,
gcp_credentials: dict = None,
service_account_key: dict = None,
) -> tuple:
"""
Setup the GCP session with the provided credentials file or service account to impersonate
Args:
credentials_file: str
service_account: str
credentials_file: str -> The credentials file path used to authenticate
service_account: dict -> The service account to impersonate
gcp_credentials: dict -> The GCP credentials following the format:
{
"client_id": str,
"client_secret": str,
"refresh_token": str,
"type": str
}
service_account_key: dict -> The service account key, used to authenticate
Returns:
Credentials object and default project ID
Raises:
GCPLoadCredentialsFromDictError if an error occurs during the loading credentials from dict
GCPLoadADCFromDictError if an error occurs during the loading credentials from dict
GCPLoadServiceAccountKeyFromDictError if an error occurs during the loading credentials from the service account key
GCPSetUpSessionError if an error occurs during the setup session
Usage:
>>> GcpProvider.setup_session(credentials_file, service_account)
>>> GcpProvider.setup_session(service_account, gcp_credentials)
>>> GcpProvider.setup_session(service_account, service_account_key)
>>> GcpProvider.setup_session(credentials_file, service_account, gcp_credentials)
"""
try:
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
@@ -347,7 +370,24 @@ class GcpProvider(Provider):
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
raise GCPLoadCredentialsFromDictError(
raise GCPLoadADCFromDictError(
file=__file__, original_exception=error
)
if service_account_key:
logger.info(
"GCP provider: Setting credentials from service account key..."
)
try:
credentials, default_project_id = load_credentials_from_dict(
service_account_key, scopes=scopes
)
return credentials, default_project_id
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
raise GCPLoadServiceAccountKeyFromDictError(
file=__file__, original_exception=error
)
@@ -388,10 +428,11 @@ class GcpProvider(Provider):
credentials_file: str = None,
service_account: str = None,
raise_on_exception: bool = True,
provider_id: Optional[str] = None,
client_id: str = None,
client_secret: str = None,
refresh_token: str = None,
provider_id: Optional[str] = None,
service_account_key: dict = None,
) -> Connection:
"""
Test the connection to GCP with the provided credentials file or service account to impersonate.
@@ -403,16 +444,18 @@ class GcpProvider(Provider):
credentials_file: str
service_account: str
raise_on_exception: bool
provider_id: Optional[str] -> The provider ID, for GCP it is the project ID
client_id: str
client_secret: str
refresh_token: str
provider_id: Optional[str] -> The provider ID, for GCP it is the project ID
service_account_key: dict
Returns:
Connection object with is_connected set to True if the connection is successful, or error set to the exception if the connection fails
Raises:
GCPLoadCredentialsFromDictError if an error occurs during the loading credentials from dict
GCPLoadADCFromDictError if an error occurs during the loading credentials from dict
GCPLoadServiceAccountKeyFromDictError if an error occurs during the loading credentials from dict
GCPSetUpSessionError if an error occurs during the setup session
GCPCloudResourceManagerAPINotUsedError if the Cloud Resource Manager API has not been used before or it is disabled
GCPInvalidProviderIdError if the provider ID does not match with the expected project_id
@@ -425,10 +468,6 @@ class GcpProvider(Provider):
... client_secret="client_secret",
... refresh_token="refresh_token"
... )
- Using a Service Account credentials file path:
>>> GcpProvider.test_connection(
... credentials_file="credentials_file"
... )
- Using ADC credentials with a Service Account to impersonate:
>>> GcpProvider.test_connection(
... client_id="client_id",
@@ -436,6 +475,14 @@ class GcpProvider(Provider):
... refresh_token="refresh_token",
... service_account="service_account"
... )
- Using service account key:
>>> GcpProvider.test_connection(
... service_account_key={"service_account_key": "service_account_key"}
... )
- Using a Service Account credentials file path:
>>> GcpProvider.test_connection(
... credentials_file="credentials_file"
... )
"""
try:
# Set the GCP credentials using the provided client_id, client_secret and refresh_token from ADC
@@ -444,9 +491,11 @@ class GcpProvider(Provider):
gcp_credentials = GcpProvider.validate_static_arguments(
client_id, client_secret, refresh_token
)
session, project_id = GcpProvider.setup_session(
credentials_file, service_account, gcp_credentials
credentials_file=credentials_file,
service_account=service_account,
gcp_credentials=gcp_credentials,
service_account_key=service_account_key,
)
if provider_id and project_id != provider_id:
# Logic to check if the provider ID matches the project ID
@@ -460,7 +509,7 @@ class GcpProvider(Provider):
return Connection(is_connected=True)
# Errors from setup_session
except GCPLoadCredentialsFromDictError as load_credentials_error:
except GCPLoadServiceAccountKeyFromDictError as load_credentials_error:
logger.critical(
f"{load_credentials_error.__class__.__name__}[{load_credentials_error.__traceback__.tb_lineno}]: {load_credentials_error}"
)
@@ -741,18 +790,14 @@ class GcpProvider(Provider):
) -> dict:
"""
Validate the static arguments client_id, client_secret and refresh_token of ADC credentials
Args:
client_id: str
client_secret: str
refresh_token: str
Returns:
dict
Raises:
GCPStaticCredentialsError if any of the static arguments is missing from the ADC credentials
Usage:
>>> GcpProvider.validate_static_arguments(client_id, client_secret, refresh_token)
"""

View File

@@ -817,6 +817,18 @@ class TestGCPProvider:
assert e.type == GCPTestConnectionError
assert "Test exception" in e.value.args[0]
def test_test_connection_with_exception_service_account_key(self):
with patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.setup_session",
side_effect=Exception("Test exception"),
):
with pytest.raises(Exception) as e:
GcpProvider.test_connection(
service_account_key={"test": "key"},
)
assert e.type == GCPTestConnectionError
assert "Test exception" in e.value.args[0]
def test_test_connection_valid_project_id(self):
project_id = "test-project-id"
mocked_service = MagicMock()