feat(gcp): improve Google Projects scan customization (#3741)

This commit is contained in:
Sergio Garcia
2024-04-10 13:16:47 +02:00
committed by GitHub
parent f13c843ba6
commit 8758ecae97
6 changed files with 182 additions and 11 deletions
+28
View File
@@ -0,0 +1,28 @@
# GCP Projects
By default, Prowler is multi-project, which means that is going to scan all the Google Cloud projects that the authenticated user has access to. If you want to scan a specific project(s), you can use the `--project-ids` argument.
```console
prowler gcp --project-ids project-id1 project-id2
```
???+ note
You can use asterisk `*` to scan projects that match a pattern. For example, `prowler gcp --project-ids "prowler*"` will scan all the projects that start with `prowler`.
???+ note
If you want to know the projects that you have access to, you can use the following command:
```console
prowler gcp --list-project-ids
```
### Exclude Projects
If you want to exclude some projects from the scan, you can use the `--exclude-project-ids` argument.
```console
prowler gcp --exclude-project-ids project-id1 project-id2
```
???+ note
You can use asterisk `*` to exclude projects that match a pattern. For example, `prowler gcp --exclude-project-ids "sys*"` will exclude all the projects that start with `sys`.
+1
View File
@@ -83,6 +83,7 @@ nav:
- Subscriptions: tutorials/azure/subscriptions.md
- Google Cloud:
- Authentication: tutorials/gcp/authentication.md
- Projects: tutorials/gcp/projects.md
- Developer Guide:
- Introduction: developer-guide/introduction.md
- Audit Info: developer-guide/audit-info.md
+60 -8
View File
@@ -1,4 +1,5 @@
import os
import re
import sys
from colorama import Fore, Style
@@ -25,6 +26,7 @@ class GcpProvider(Provider):
_type: str = "gcp"
_session: Credentials
_project_ids: list
_excluded_project_ids: list
_identity: GCPIdentityInfo
_audit_config: dict
_output_options: GCPOutputOptions
@@ -35,12 +37,15 @@ class GcpProvider(Provider):
def __init__(self, arguments):
logger.info("Instantiating GCP Provider ...")
input_project_ids = arguments.project_id
excluded_project_ids = arguments.excluded_project_id
credentials_file = arguments.credentials_file
list_project_ids = arguments.list_project_id
self._session = self.setup_session(credentials_file)
self._project_ids = []
self._projects = {}
self._excluded_project_ids = []
accessible_projects = self.get_projects()
if not accessible_projects:
logger.critical("No Project IDs can be accessed via Google Credentials.")
@@ -48,20 +53,42 @@ class GcpProvider(Provider):
if input_project_ids:
for input_project in input_project_ids:
if input_project in accessible_projects.keys():
self._projects[input_project] = accessible_projects[input_project]
self._project_ids.append(accessible_projects[input_project].id)
else:
logger.critical(
f"Project {input_project} cannot be accessed via Google Credentials."
)
sys.exit(1)
for accessible_project in accessible_projects:
if self.is_project_matching(input_project, accessible_project):
self._projects[accessible_project] = accessible_projects[
accessible_project
]
self._project_ids.append(
accessible_projects[accessible_project].id
)
else:
# If not projects were input, all accessible projects are scanned by default
for project_id, project in accessible_projects.items():
self._projects[project_id] = project
self._project_ids.append(project_id)
# Remove excluded projects if any input
if excluded_project_ids:
for excluded_project in excluded_project_ids:
for project_id in self._project_ids:
if self.is_project_matching(excluded_project, project_id):
self._excluded_project_ids.append(project_id)
for project_id in self._excluded_project_ids:
self._projects.pop(project_id)
self._project_ids.remove(project_id)
if not self._projects:
logger.critical(
"No Input Project IDs can be accessed via Google Credentials."
)
sys.exit(1)
if list_project_ids:
print(
f"{Fore.YELLOW}Available GCP Project IDs{Style.RESET_ALL}:\n{' '.join(self._project_ids)}\n"
)
sys.exit(0)
# Update organizations info
self.update_projects_with_organizations()
@@ -95,6 +122,10 @@ class GcpProvider(Provider):
def project_ids(self):
return self._project_ids
@property
def excluded_project_ids(self):
return self._excluded_project_ids
@property
def audit_config(self):
return self._audit_config
@@ -172,6 +203,10 @@ class GcpProvider(Provider):
f"GCP Account: {Fore.YELLOW}{self.identity.profile}{Style.RESET_ALL}",
f"GCP Project IDs: {Fore.YELLOW}{', '.join(self.project_ids)}{Style.RESET_ALL}",
]
if self.excluded_project_ids:
report_lines.append(
f"Excluded GCP Project IDs: {Fore.YELLOW}{', '.join(self.excluded_project_ids)}{Style.RESET_ALL}"
)
report_title = (
f"{Style.BRIGHT}Using the GCP credentials below:{Style.RESET_ALL}"
)
@@ -269,3 +304,20 @@ class GcpProvider(Provider):
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def is_project_matching(self, input_project: str, project_to_match: str) -> bool:
"""
Check if the input project matches the project to match
Args:
input_project: str
project_to_match: str
Returns:
bool
"""
return (
"*" in input_project
and re.search(
"." + input_project if input_project.startswith("*") else input_project,
project_to_match,
)
) or input_project == project_to_match
@@ -12,12 +12,25 @@ def init_parser(self):
metavar="FILE_PATH",
help="Authenticate using a Google Service Account Application Credentials JSON file",
)
# Subscriptions
gcp_subscriptions_subparser = gcp_parser.add_argument_group("Projects")
gcp_subscriptions_subparser.add_argument(
# Projects
gcp_projects_subparser = gcp_parser.add_argument_group("Projects")
gcp_projects_subparser.add_argument(
"--project-id",
"--project-ids",
nargs="+",
default=[],
help="GCP Project IDs to be scanned by Prowler",
)
gcp_projects_subparser.add_argument(
"--excluded-project-id",
"--excluded-project-ids",
nargs="+",
default=[],
help="Excluded GCP Project IDs to be scanned by Prowler",
)
gcp_projects_subparser.add_argument(
"--list-project-id",
"--list-project-ids",
action="store_true",
help="List available project IDs in Google Cloud which can be scanned by Prowler",
)
+18
View File
@@ -1188,6 +1188,24 @@ class Test_Parser:
assert parsed.project_id[0] == project_1
assert parsed.project_id[1] == project_2
def test_parser_gcp_excluded_project_id(self):
argument = "--excluded-project-id"
project_1 = "test_project_1"
project_2 = "test_project_2"
command = [prowler_command, "gcp", argument, project_1, project_2]
parsed = self.parser.parse(command)
assert parsed.provider == "gcp"
assert len(parsed.excluded_project_id) == 2
assert parsed.excluded_project_id[0] == project_1
assert parsed.excluded_project_id[1] == project_2
def test_parser_gcp_list_project_id(self):
argument = "--list-project-id"
command = [prowler_command, "gcp", argument]
parsed = self.parser.parse(command)
assert parsed.provider == "gcp"
assert parsed.list_project_id
def test_parser_kubernetes_auth_kubeconfig_file(self):
argument = "--kubeconfig-file"
file = "config"
+59
View File
@@ -14,6 +14,8 @@ class TestGCPProvider:
def test_gcp_provider(self):
arguments = Namespace()
arguments.project_id = []
arguments.excluded_project_id = []
arguments.list_project_id = False
arguments.credentials_file = ""
arguments.config_file = default_config_file_path
@@ -47,6 +49,8 @@ class TestGCPProvider:
def test_gcp_provider_output_options(self):
arguments = Namespace()
arguments.project_id = []
arguments.excluded_project_id = []
arguments.list_project_id = False
arguments.credentials_file = ""
arguments.config_file = default_config_file_path
@@ -106,3 +110,58 @@ class TestGCPProvider:
# TODO: move this to a fixtures file
rmdir(f"{arguments.output_directory}/compliance")
rmdir(arguments.output_directory)
@freeze_time(datetime.today())
def test_is_project_matching(self):
arguments = Namespace()
arguments.project_id = []
arguments.excluded_project_id = []
arguments.list_project_id = False
arguments.credentials_file = ""
arguments.config_file = default_config_file_path
# Output options
arguments.status = []
arguments.output_formats = ["csv"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = "test-api-key"
projects = {
"test-project": GCPProject(
number="55555555",
id="project/55555555",
name="test-project",
labels="",
lifecycle_state="",
)
}
with patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.setup_session",
return_value=None,
), patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.get_projects",
return_value=projects,
), patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.update_projects_with_organizations",
return_value=None,
):
gcp_provider = GcpProvider(arguments)
gcp_provider.output_options = arguments, {}
input_project = "sys-*"
project_to_match = "sys-12345678"
assert gcp_provider.is_project_matching(input_project, project_to_match)
input_project = "*prowler"
project_to_match = "test-prowler"
assert gcp_provider.is_project_matching(input_project, project_to_match)
input_project = "test-project"
project_to_match = "test-project"
assert gcp_provider.is_project_matching(input_project, project_to_match)
input_project = "*test*"
project_to_match = "prowler-test-project"
assert gcp_provider.is_project_matching(input_project, project_to_match)
input_project = "prowler-test-project"
project_to_match = "prowler-test"
assert not gcp_provider.is_project_matching(input_project, project_to_match)