chore(mitre gcp): add mitre mapping for gcp (#3899)

Co-authored-by: pedrooot <pedromarting3@gmail.com>
Co-authored-by: Sergio <sergio@prowler.com>
This commit is contained in:
Nacho Rivera
2024-05-06 11:10:44 +02:00
committed by GitHub
parent 3162f6cd92
commit 808d87a0dd
10 changed files with 2458 additions and 18 deletions

View File

@@ -0,0 +1,23 @@
import warnings
from dashboard.common_methods import get_section_containers_format2
warnings.filterwarnings("ignore")
def get_table(data):
aux = data[
[
"REQUIREMENTS_ID",
"REQUIREMENTS_SUBTECHNIQUES",
"CHECKID",
"STATUS",
"REGION",
"ACCOUNTID",
"RESOURCEID",
]
].copy()
return get_section_containers_format2(
aux, "REQUIREMENTS_ID", "REQUIREMENTS_SUBTECHNIQUES"
)

View File

@@ -263,7 +263,7 @@ def display_data(
# Rename the column PROJECTID to ACCOUNTID for GCP
if data.columns.str.contains("PROJECTID").any():
data.rename(columns={"PROJECTID": "ACCOUNTID"}, inplace=True)
data["REGION"] = "-"
# Rename the column SUBSCRIPTIONID to ACCOUNTID for Azure
if data.columns.str.contains("SUBSCRIPTIONID").any():
data.rename(columns={"SUBSCRIPTIONID": "ACCOUNTID"}, inplace=True)

View File

@@ -124,6 +124,7 @@ def prowler():
bulk_checks_metadata = update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
)
# Update checks metadata if the --custom-checks-metadata-file is present
custom_checks_metadata = None
if custom_checks_metadata_file:

View File

@@ -397,7 +397,7 @@
"Id": "2.13",
"Description": "GCP Cloud Asset Inventory is services that provides a historical view of GCP resources and IAM policies through a time-series database. The information recorded includes metadata on Google Cloud resources, metadata on policies set on Google Cloud projects or resources, and runtime information gathered within a Google Cloud resource.",
"Checks": [
"serviceusage_cloudasset_inventory_enabled"
"iam_cloud_asset_inventory_enabled"
],
"Attributes": [
{

File diff suppressed because it is too large Load Diff

View File

@@ -119,7 +119,7 @@ class ISO27001_2013_Requirement_Attribute(BaseModel):
Check_Summary: str
# MITRE Requirement Attribute
# MITRE Requirement Attribute for AWS
class Mitre_Requirement_Attribute_AWS(BaseModel):
"""MITRE Requirement Attribute"""
@@ -129,7 +129,7 @@ class Mitre_Requirement_Attribute_AWS(BaseModel):
Comment: str
# MITRE Requirement Attribute
# MITRE Requirement Attribute for Azure
class Mitre_Requirement_Attribute_Azure(BaseModel):
"""MITRE Requirement Attribute"""
@@ -139,6 +139,16 @@ class Mitre_Requirement_Attribute_Azure(BaseModel):
Comment: str
# MITRE Requirement Attribute for GCP
class Mitre_Requirement_Attribute_GCP(BaseModel):
"""MITRE Requirement Attribute"""
GCPService: str
Category: str
Value: str
Comment: str
# MITRE Requirement
class Mitre_Requirement(BaseModel):
"""Mitre_Requirement holds the model for every MITRE requirement"""
@@ -151,7 +161,9 @@ class Mitre_Requirement(BaseModel):
Platforms: list[str]
TechniqueURL: str
Attributes: Union[
list[Mitre_Requirement_Attribute_AWS], list[Mitre_Requirement_Attribute_Azure]
list[Mitre_Requirement_Attribute_AWS],
list[Mitre_Requirement_Attribute_Azure],
list[Mitre_Requirement_Attribute_GCP],
]
Checks: list[str]
@@ -184,7 +196,12 @@ class Compliance_Base_Model(BaseModel):
Provider: str
Version: Optional[str]
Description: str
Requirements: list[Union[Mitre_Requirement, Compliance_Requirement]]
Requirements: list[
Union[
Mitre_Requirement,
Compliance_Requirement,
]
]
@root_validator(pre=True)
# noqa: F841 - since vulture raises unused variable 'cls'

View File

@@ -39,6 +39,10 @@ def write_compliance_row_mitre_attack(file_descriptors, finding, compliance, pro
attributes_services = ", ".join(
attribute.AzureService for attribute in requirement.Attributes
)
elif compliance.Provider == "GCP":
attributes_services = ", ".join(
attribute.GCPService for attribute in requirement.Attributes
)
requirement_description = requirement.Description
requirement_id = requirement.Id
requirement_name = requirement.Name
@@ -82,6 +86,8 @@ def write_compliance_row_mitre_attack(file_descriptors, finding, compliance, pro
common_data["SubscriptionId"] = unroll_list(
provider.identity.subscriptions
)
elif compliance.Provider == "GCP":
common_data["ProjectId"] = unroll_list(provider.projects)
compliance_row = mitre_attack_model(**common_data)
@@ -148,10 +154,9 @@ def get_mitre_attack_table(
mitre_compliance_table["Status"].append(
f"{Fore.GREEN}PASS({tactics[tactic]['PASS']}){Style.RESET_ALL}"
)
if tactics[tactic]["Muted"] > 0:
mitre_compliance_table["Muted"].append(
f"{orange_color}{tactics[tactic]['Muted']}{Style.RESET_ALL}"
)
mitre_compliance_table["Muted"].append(
f"{orange_color}{tactics[tactic]['Muted']}{Style.RESET_ALL}"
)
if (
len(fail_count) + len(pass_count) + len(muted_count) > 1
): # If there are no resources, don't print the compliance table

View File

@@ -54,3 +54,30 @@ class MitreAttackAzure(BaseModel):
ResourceId: str
CheckId: str
Muted: bool
class MitreAttackGCP(BaseModel):
"""
MitreAttackGCP generates a finding's output in CSV MITRE ATTACK format for AWS.
"""
Provider: str
Description: str
ProjectId: str
AssessmentDate: str
Requirements_Id: str
Requirements_Name: str
Requirements_Description: str
Requirements_Tactics: str
Requirements_SubTechniques: str
Requirements_Platforms: str
Requirements_TechniqueURL: str
Requirements_Attributes_Services: str
Requirements_Attributes_Categories: str
Requirements_Attributes_Values: str
Requirements_Attributes_Comments: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
Muted: bool

View File

@@ -12,6 +12,7 @@ from prowler.lib.outputs.common_models import FindingOutput
from prowler.lib.outputs.compliance.mitre_attack.models import (
MitreAttackAWS,
MitreAttackAzure,
MitreAttackGCP,
)
from prowler.lib.outputs.compliance.models import (
Check_Output_CSV_AWS_CIS,
@@ -88,6 +89,13 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, provi
filename, output_mode, Check_Output_CSV_GCP_CIS
)
file_descriptors.update({output_mode: file_descriptor})
elif output_mode == "mitre_attack_gcp":
file_descriptor = initialize_file_descriptor(
filename,
output_mode,
MitreAttackGCP,
)
file_descriptors.update({output_mode: file_descriptor})
else:
file_descriptor = initialize_file_descriptor(
filename,
@@ -192,14 +200,6 @@ def fill_file_descriptors(output_modes, output_directory, output_filename, provi
Check_Output_CSV_Generic_Compliance,
)
file_descriptors.update({output_mode: file_descriptor})
elif provider.type == "azure":
filename = f"{output_directory}/compliance/{output_filename}_{output_mode}{csv_file_suffix}"
file_descriptor = initialize_file_descriptor(
filename,
output_mode,
Check_Output_CSV_Generic_Compliance,
)
file_descriptors.update({output_mode: file_descriptor})
except Exception as error:
logger.error(