feat(compliance): add ENSRD2022 for Azure and GCP (#5746)

This commit is contained in:
Pedro Martín
2024-11-21 09:36:47 +01:00
committed by GitHub
parent 9b0b61ef02
commit 2e20d52030
15 changed files with 5091 additions and 12 deletions

View File

@@ -64,8 +64,8 @@ It contains hundreds of controls covering CIS, NIST 800, NIST CSF, CISA, RBI, Fe
| Provider | Checks | Services | [Compliance Frameworks](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/compliance/) | [Categories](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/misc/#categories) |
|---|---|---|---|---|
| AWS | 553 | 77 -> `prowler aws --list-services` | 30 -> `prowler aws --list-compliance` | 9 -> `prowler aws --list-categories` |
| GCP | 77 | 13 -> `prowler gcp --list-services` | 2 -> `prowler gcp --list-compliance` | 2 -> `prowler gcp --list-categories`|
| Azure | 138 | 17 -> `prowler azure --list-services` | 3 -> `prowler azure --list-compliance` | 2 -> `prowler azure --list-categories` |
| GCP | 77 | 13 -> `prowler gcp --list-services` | 3 -> `prowler gcp --list-compliance` | 2 -> `prowler gcp --list-categories`|
| Azure | 138 | 17 -> `prowler azure --list-services` | 4 -> `prowler azure --list-compliance` | 2 -> `prowler azure --list-categories` |
| Kubernetes | 83 | 7 -> `prowler kubernetes --list-services` | 1 -> `prowler kubernetes --list-compliance` | 7 -> `prowler kubernetes --list-categories` |
# 💻 Installation

View File

@@ -0,0 +1,36 @@
import warnings
from dashboard.common_methods import get_section_containers_ens
warnings.filterwarnings("ignore")
def get_table(data):
# append the requirements_description to idgrupocontrol
data["REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL"] = (
data["REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL"]
+ " - "
+ data["REQUIREMENTS_DESCRIPTION"]
)
aux = data[
[
"REQUIREMENTS_ATTRIBUTES_MARCO",
"REQUIREMENTS_ATTRIBUTES_CATEGORIA",
"REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL",
"REQUIREMENTS_ATTRIBUTES_TIPO",
"CHECKID",
"STATUS",
"REGION",
"ACCOUNTID",
"RESOURCEID",
]
]
return get_section_containers_ens(
aux,
"REQUIREMENTS_ATTRIBUTES_MARCO",
"REQUIREMENTS_ATTRIBUTES_CATEGORIA",
"REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL",
"REQUIREMENTS_ATTRIBUTES_TIPO",
)

View File

@@ -0,0 +1,36 @@
import warnings
from dashboard.common_methods import get_section_containers_ens
warnings.filterwarnings("ignore")
def get_table(data):
# append the requirements_description to idgrupocontrol
data["REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL"] = (
data["REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL"]
+ " - "
+ data["REQUIREMENTS_DESCRIPTION"]
)
aux = data[
[
"REQUIREMENTS_ATTRIBUTES_MARCO",
"REQUIREMENTS_ATTRIBUTES_CATEGORIA",
"REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL",
"REQUIREMENTS_ATTRIBUTES_TIPO",
"CHECKID",
"STATUS",
"REGION",
"ACCOUNTID",
"RESOURCEID",
]
]
return get_section_containers_ens(
aux,
"REQUIREMENTS_ATTRIBUTES_MARCO",
"REQUIREMENTS_ATTRIBUTES_CATEGORIA",
"REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL",
"REQUIREMENTS_ATTRIBUTES_TIPO",
)

View File

@@ -148,6 +148,7 @@ else:
select_account_dropdown_list = ["All"]
# Append to the list the unique values of the columns ACCOUNTID, PROJECTID and SUBSCRIPTIONID if they exist
if "ACCOUNTID" in data.columns:
data["ACCOUNTID"] = data["ACCOUNTID"].astype(str)
select_account_dropdown_list = select_account_dropdown_list + list(
data["ACCOUNTID"].unique()
)
@@ -246,9 +247,11 @@ def display_data(
dfs = []
for file in files:
df = pd.read_csv(
file, sep=";", on_bad_lines="skip", encoding=encoding_format
file, sep=";", on_bad_lines="skip", encoding=encoding_format, dtype=str
)
dfs.append(df.astype(str))
df = df.astype(str).fillna("nan")
df.columns = df.columns.astype(str)
dfs.append(df)
return pd.concat(dfs, ignore_index=True)
data = load_csv_files(files)
@@ -274,17 +277,24 @@ def display_data(
data.rename(columns={"PROJECTID": "ACCOUNTID"}, inplace=True)
data["REGION"] = "-"
# Rename the column SUBSCRIPTIONID to ACCOUNTID for Azure
if data.columns.str.contains("SUBSCRIPTIONID").any():
if (
data.columns.str.contains("SUBSCRIPTIONID").any()
and not data.columns.str.contains("ACCOUNTID").any()
):
data.rename(columns={"SUBSCRIPTIONID": "ACCOUNTID"}, inplace=True)
data["REGION"] = "-"
# Handle v3 azure cis compliance
if data.columns.str.contains("SUBSCRIPTION").any():
if (
data.columns.str.contains("SUBSCRIPTION").any()
and not data.columns.str.contains("ACCOUNTID").any()
):
data.rename(columns={"SUBSCRIPTION": "ACCOUNTID"}, inplace=True)
data["REGION"] = "-"
# Filter ACCOUNT
if account_filter == ["All"]:
updated_cloud_account_values = data["ACCOUNTID"].unique()
elif "All" in account_filter and len(account_filter) > 1:
# Remove 'All' from the list
account_filter.remove("All")
@@ -299,9 +309,11 @@ def display_data(
account_filter_options = list(data["ACCOUNTID"].unique())
account_filter_options = account_filter_options + ["All"]
for item in account_filter_options:
if "nan" in item or item.__class__.__name__ != "str" or item is None:
account_filter_options.remove(item)
account_filter_options = [
item
for item in account_filter_options
if isinstance(item, str) and item.lower() != "nan"
]
# Filter REGION
if region_filter_analytics == ["All"]:

View File

@@ -53,6 +53,8 @@ from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
from prowler.lib.outputs.compliance.cis.cis_kubernetes import KubernetesCIS
from prowler.lib.outputs.compliance.compliance import display_compliance_table
from prowler.lib.outputs.compliance.ens.ens_aws import AWSENS
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.compliance.iso27001.iso27001_aws import AWSISO27001
from prowler.lib.outputs.compliance.kisa_ismsp.kisa_ismsp_aws import AWSKISAISMSP
@@ -511,6 +513,20 @@ def prowler():
)
generated_outputs["compliance"].append(mitre_attack)
mitre_attack.batch_write_data_to_file()
elif compliance_name.startswith("ens_"):
# Generate ENS Finding Object
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
ens = AzureENS(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)
generated_outputs["compliance"].append(ens)
ens.batch_write_data_to_file()
else:
filename = (
f"{output_options.output_directory}/compliance/"
@@ -555,6 +571,20 @@ def prowler():
)
generated_outputs["compliance"].append(mitre_attack)
mitre_attack.batch_write_data_to_file()
elif compliance_name.startswith("ens_"):
# Generate ENS Finding Object
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
ens = GCPENS(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)
generated_outputs["compliance"].append(ens)
ens.batch_write_data_to_file()
else:
filename = (
f"{output_options.output_directory}/compliance/"

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -30,7 +30,7 @@ def get_ens_table(
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "ENS" and compliance.Provider == "AWS":
if compliance.Framework == "ENS":
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
marco_categoria = f"{attribute.Marco}/{attribute.Categoria}"

View File

@@ -0,0 +1,103 @@
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.ens.models import AzureENSModel
from prowler.lib.outputs.finding import Finding
class AzureENS(ComplianceOutput):
"""
This class represents the Azure ENS compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into Azure ENS compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS ENS compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AzureENSModel(
Provider=finding.provider,
Description=compliance.Description,
SubscriptionId=finding.account_name,
Location=finding.region,
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
Requirements_Attributes_Marco=attribute.Marco,
Requirements_Attributes_Categoria=attribute.Categoria,
Requirements_Attributes_DescripcionControl=attribute.DescripcionControl,
Requirements_Attributes_Nivel=attribute.Nivel,
Requirements_Attributes_Tipo=attribute.Tipo,
Requirements_Attributes_Dimensiones=",".join(
attribute.Dimensiones
),
Requirements_Attributes_ModoEjecucion=attribute.ModoEjecucion,
Requirements_Attributes_Dependencias=",".join(
attribute.Dependencias
),
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AzureENSModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
SubscriptionId="",
Location="",
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
Requirements_Attributes_Marco=attribute.Marco,
Requirements_Attributes_Categoria=attribute.Categoria,
Requirements_Attributes_DescripcionControl=attribute.DescripcionControl,
Requirements_Attributes_Nivel=attribute.Nivel,
Requirements_Attributes_Tipo=attribute.Tipo,
Requirements_Attributes_Dimensiones=",".join(
attribute.Dimensiones
),
Requirements_Attributes_ModoEjecucion=attribute.ModoEjecucion,
Requirements_Attributes_Dependencias=",".join(
attribute.Dependencias
),
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
)
self._data.append(compliance_row)

View File

@@ -0,0 +1,103 @@
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.ens.models import GCPENSModel
from prowler.lib.outputs.finding import Finding
class GCPENS(ComplianceOutput):
"""
This class represents the GCP ENS compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into GCP ENS compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS ENS compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GCPENSModel(
Provider=finding.provider,
Description=compliance.Description,
ProjectId=finding.account_uid,
Location=finding.region,
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
Requirements_Attributes_Marco=attribute.Marco,
Requirements_Attributes_Categoria=attribute.Categoria,
Requirements_Attributes_DescripcionControl=attribute.DescripcionControl,
Requirements_Attributes_Nivel=attribute.Nivel,
Requirements_Attributes_Tipo=attribute.Tipo,
Requirements_Attributes_Dimensiones=",".join(
attribute.Dimensiones
),
Requirements_Attributes_ModoEjecucion=attribute.ModoEjecucion,
Requirements_Attributes_Dependencias=",".join(
attribute.Dependencias
),
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GCPENSModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
ProjectId="",
Location="",
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
Requirements_Attributes_Marco=attribute.Marco,
Requirements_Attributes_Categoria=attribute.Categoria,
Requirements_Attributes_DescripcionControl=attribute.DescripcionControl,
Requirements_Attributes_Nivel=attribute.Nivel,
Requirements_Attributes_Tipo=attribute.Tipo,
Requirements_Attributes_Dimensiones=",".join(
attribute.Dimensiones
),
Requirements_Attributes_ModoEjecucion=attribute.ModoEjecucion,
Requirements_Attributes_Dependencias=",".join(
attribute.Dependencias
),
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
)
self._data.append(compliance_row)

View File

@@ -28,3 +28,61 @@ class AWSENSModel(BaseModel):
CheckId: str
Muted: bool
ResourceName: str
class AzureENSModel(BaseModel):
"""
AzureENSModel generates a finding's output in CSV ENS format for Azure.
"""
Provider: str
Description: str
SubscriptionId: str
Location: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_IdGrupoControl: str
Requirements_Attributes_Marco: str
Requirements_Attributes_Categoria: str
Requirements_Attributes_DescripcionControl: str
Requirements_Attributes_Nivel: str
Requirements_Attributes_Tipo: str
Requirements_Attributes_Dimensiones: str
Requirements_Attributes_ModoEjecucion: str
Requirements_Attributes_Dependencias: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
Muted: bool
ResourceName: str
class GCPENSModel(BaseModel):
"""
GCPENSModel generates a finding's output in CSV ENS format for GCP.
"""
Provider: str
Description: str
ProjectId: str
Location: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_IdGrupoControl: str
Requirements_Attributes_Marco: str
Requirements_Attributes_Categoria: str
Requirements_Attributes_DescripcionControl: str
Requirements_Attributes_Nivel: str
Requirements_Attributes_Tipo: str
Requirements_Attributes_Dimensiones: str
Requirements_Attributes_ModoEjecucion: str
Requirements_Attributes_Dependencias: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
Muted: bool
ResourceName: str

View File

@@ -654,7 +654,7 @@ class Test_Parser:
def test_checks_parser_wrong_compliance(self):
argument = "--compliance"
framework = "ens_rd2022_azure"
framework = "ens_rd2022_kubernetes"
command = [prowler_command, argument, framework]
with pytest.raises(SystemExit) as wrapped_exit:
_ = self.parser.parse(command)

View File

@@ -0,0 +1,140 @@
from datetime import datetime
from io import StringIO
from freezegun import freeze_time
from mock import patch
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.models import AzureENSModel
from tests.lib.outputs.compliance.fixtures import ENS_RD2022_AZURE
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
class TestAzureENS:
def test_output_transform(self):
findings = [
generate_finding_output(
compliance={"ENS-RD2022": "op.exp.8.azure.ct.3"},
provider="azure",
region="global",
),
]
output = AzureENS(findings, ENS_RD2022_AZURE)
output_data = output.data[0]
assert isinstance(output_data, AzureENSModel)
assert output_data.Provider == "azure"
assert output_data.SubscriptionId == "123456789012"
assert output_data.Location == "global"
assert output_data.Description == ENS_RD2022_AZURE.Description
assert output_data.Requirements_Id == ENS_RD2022_AZURE.Requirements[0].Id
assert (
output_data.Requirements_Description
== ENS_RD2022_AZURE.Requirements[0].Description
)
assert (
output_data.Requirements_Attributes_IdGrupoControl
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].IdGrupoControl
)
assert (
output_data.Requirements_Attributes_Marco
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].Marco
)
assert (
output_data.Requirements_Attributes_Categoria
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].Categoria
)
assert (
output_data.Requirements_Attributes_DescripcionControl
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].DescripcionControl
)
assert (
output_data.Requirements_Attributes_Nivel
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].Nivel
)
assert (
output_data.Requirements_Attributes_Tipo
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].Tipo
)
assert [
output_data.Requirements_Attributes_Dimensiones
] == ENS_RD2022_AZURE.Requirements[0].Attributes[0].Dimensiones
assert (
output_data.Requirements_Attributes_ModoEjecucion
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].ModoEjecucion
)
assert output_data.Requirements_Attributes_Dependencias == ""
assert output_data.Status == "PASS"
assert output_data.StatusExtended == ""
assert output_data.ResourceId == ""
assert output_data.ResourceName == ""
assert output_data.CheckId == "test-check-id"
assert output_data.Muted is False
# Test manual check
output_data_manual = output.data[1]
assert output_data_manual.Provider == "azure"
assert output_data_manual.SubscriptionId == ""
assert output_data_manual.Location == ""
assert output_data_manual.Requirements_Id == ENS_RD2022_AZURE.Requirements[1].Id
assert (
output_data_manual.Requirements_Description
== ENS_RD2022_AZURE.Requirements[1].Description
)
assert (
output_data_manual.Requirements_Attributes_IdGrupoControl
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].IdGrupoControl
)
assert (
output_data_manual.Requirements_Attributes_Marco
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].Marco
)
assert (
output_data_manual.Requirements_Attributes_Categoria
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].Categoria
)
assert (
output_data_manual.Requirements_Attributes_DescripcionControl
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].DescripcionControl
)
assert (
output_data_manual.Requirements_Attributes_Nivel
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].Nivel
)
assert (
output_data_manual.Requirements_Attributes_Tipo
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].Tipo
)
assert [
output_data_manual.Requirements_Attributes_Dimensiones
] == ENS_RD2022_AZURE.Requirements[1].Attributes[0].Dimensiones
assert (
output_data_manual.Requirements_Attributes_ModoEjecucion
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].ModoEjecucion
)
assert output_data_manual.Status == "MANUAL"
assert output_data_manual.StatusExtended == "Manual check"
assert output_data_manual.ResourceId == "manual_check"
assert output_data_manual.ResourceName == "Manual check"
assert output_data_manual.CheckId == "manual"
assert output_data_manual.Muted is False
@freeze_time(datetime.now())
def test_batch_write_data_to_file(self):
mock_file = StringIO()
findings = [
generate_finding_output(
compliance={"ENS-RD2022": "op.exp.8.azure.ct.3"},
provider="azure",
region="global",
),
]
output = AzureENS(findings, ENS_RD2022_AZURE)
output._file_descriptor = mock_file
with patch.object(mock_file, "close", return_value=None):
output.batch_write_data_to_file()
mock_file.seek(0)
content = mock_file.read()
expected_csv = f"PROVIDER;DESCRIPTION;SUBSCRIPTIONID;LOCATION;ASSESSMENTDATE;REQUIREMENTS_ID;REQUIREMENTS_DESCRIPTION;REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL;REQUIREMENTS_ATTRIBUTES_MARCO;REQUIREMENTS_ATTRIBUTES_CATEGORIA;REQUIREMENTS_ATTRIBUTES_DESCRIPCIONCONTROL;REQUIREMENTS_ATTRIBUTES_NIVEL;REQUIREMENTS_ATTRIBUTES_TIPO;REQUIREMENTS_ATTRIBUTES_DIMENSIONES;REQUIREMENTS_ATTRIBUTES_MODOEJECUCION;REQUIREMENTS_ATTRIBUTES_DEPENDENCIAS;STATUS;STATUSEXTENDED;RESOURCEID;CHECKID;MUTED;RESOURCENAME\r\nazure;The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.;123456789012;global;{datetime.now()};op.exp.8.azure.ct.3;Registro de actividad;op.exp.8;operacional;explotación;Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.;alto;requisito;trazabilidad;automático;;PASS;;;test-check-id;False;\r\nazure;The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.;;;{datetime.now()};op.exp.8.azure.ct.4;Registro de actividad;op.exp.8;operacional;explotación;Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.;alto;requisito;trazabilidad;automático;;MANUAL;Manual check;manual_check;manual;False;Manual check\r\n"
assert content == expected_csv

View File

@@ -0,0 +1,140 @@
from datetime import datetime
from io import StringIO
from freezegun import freeze_time
from mock import patch
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.ens.models import GCPENSModel
from tests.lib.outputs.compliance.fixtures import ENS_RD2022_GCP
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
class TestGCPENS:
def test_output_transform(self):
findings = [
generate_finding_output(
compliance={"ENS-RD2022": "op.exp.8.gcp.ct.3"},
provider="gcp",
region="global",
),
]
output = GCPENS(findings, ENS_RD2022_GCP)
output_data = output.data[0]
assert isinstance(output_data, GCPENSModel)
assert output_data.Provider == "gcp"
assert output_data.ProjectId == "123456789012"
assert output_data.Location == "global"
assert output_data.Description == ENS_RD2022_GCP.Description
assert output_data.Requirements_Id == ENS_RD2022_GCP.Requirements[0].Id
assert (
output_data.Requirements_Description
== ENS_RD2022_GCP.Requirements[0].Description
)
assert (
output_data.Requirements_Attributes_IdGrupoControl
== ENS_RD2022_GCP.Requirements[0].Attributes[0].IdGrupoControl
)
assert (
output_data.Requirements_Attributes_Marco
== ENS_RD2022_GCP.Requirements[0].Attributes[0].Marco
)
assert (
output_data.Requirements_Attributes_Categoria
== ENS_RD2022_GCP.Requirements[0].Attributes[0].Categoria
)
assert (
output_data.Requirements_Attributes_DescripcionControl
== ENS_RD2022_GCP.Requirements[0].Attributes[0].DescripcionControl
)
assert (
output_data.Requirements_Attributes_Nivel
== ENS_RD2022_GCP.Requirements[0].Attributes[0].Nivel
)
assert (
output_data.Requirements_Attributes_Tipo
== ENS_RD2022_GCP.Requirements[0].Attributes[0].Tipo
)
assert [
output_data.Requirements_Attributes_Dimensiones
] == ENS_RD2022_GCP.Requirements[0].Attributes[0].Dimensiones
assert (
output_data.Requirements_Attributes_ModoEjecucion
== ENS_RD2022_GCP.Requirements[0].Attributes[0].ModoEjecucion
)
assert output_data.Requirements_Attributes_Dependencias == ""
assert output_data.Status == "PASS"
assert output_data.StatusExtended == ""
assert output_data.ResourceId == ""
assert output_data.ResourceName == ""
assert output_data.CheckId == "test-check-id"
assert output_data.Muted is False
# Test manual check
output_data_manual = output.data[1]
assert output_data_manual.Provider == "gcp"
assert output_data_manual.ProjectId == ""
assert output_data_manual.Location == ""
assert output_data_manual.Requirements_Id == ENS_RD2022_GCP.Requirements[1].Id
assert (
output_data_manual.Requirements_Description
== ENS_RD2022_GCP.Requirements[1].Description
)
assert (
output_data_manual.Requirements_Attributes_IdGrupoControl
== ENS_RD2022_GCP.Requirements[1].Attributes[0].IdGrupoControl
)
assert (
output_data_manual.Requirements_Attributes_Marco
== ENS_RD2022_GCP.Requirements[1].Attributes[0].Marco
)
assert (
output_data_manual.Requirements_Attributes_Categoria
== ENS_RD2022_GCP.Requirements[1].Attributes[0].Categoria
)
assert (
output_data_manual.Requirements_Attributes_DescripcionControl
== ENS_RD2022_GCP.Requirements[1].Attributes[0].DescripcionControl
)
assert (
output_data_manual.Requirements_Attributes_Nivel
== ENS_RD2022_GCP.Requirements[1].Attributes[0].Nivel
)
assert (
output_data_manual.Requirements_Attributes_Tipo
== ENS_RD2022_GCP.Requirements[1].Attributes[0].Tipo
)
assert [
output_data_manual.Requirements_Attributes_Dimensiones
] == ENS_RD2022_GCP.Requirements[1].Attributes[0].Dimensiones
assert (
output_data_manual.Requirements_Attributes_ModoEjecucion
== ENS_RD2022_GCP.Requirements[1].Attributes[0].ModoEjecucion
)
assert output_data_manual.Status == "MANUAL"
assert output_data_manual.StatusExtended == "Manual check"
assert output_data_manual.ResourceId == "manual_check"
assert output_data_manual.ResourceName == "Manual check"
assert output_data_manual.CheckId == "manual"
assert output_data_manual.Muted is False
@freeze_time(datetime.now())
def test_batch_write_data_to_file(self):
mock_file = StringIO()
findings = [
generate_finding_output(
compliance={"ENS-RD2022": "op.exp.8.gcp.ct.3"},
provider="gcp",
region="global",
),
]
output = GCPENS(findings, ENS_RD2022_GCP)
output._file_descriptor = mock_file
with patch.object(mock_file, "close", return_value=None):
output.batch_write_data_to_file()
mock_file.seek(0)
content = mock_file.read()
expected_csv = f"PROVIDER;DESCRIPTION;PROJECTID;LOCATION;ASSESSMENTDATE;REQUIREMENTS_ID;REQUIREMENTS_DESCRIPTION;REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL;REQUIREMENTS_ATTRIBUTES_MARCO;REQUIREMENTS_ATTRIBUTES_CATEGORIA;REQUIREMENTS_ATTRIBUTES_DESCRIPCIONCONTROL;REQUIREMENTS_ATTRIBUTES_NIVEL;REQUIREMENTS_ATTRIBUTES_TIPO;REQUIREMENTS_ATTRIBUTES_DIMENSIONES;REQUIREMENTS_ATTRIBUTES_MODOEJECUCION;REQUIREMENTS_ATTRIBUTES_DEPENDENCIAS;STATUS;STATUSEXTENDED;RESOURCEID;CHECKID;MUTED;RESOURCENAME\r\ngcp;The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.;123456789012;global;{datetime.now()};op.exp.8.gcp.ct.3;Registro de actividad;op.exp.8;operacional;explotación;Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.;alto;requisito;trazabilidad;automático;;PASS;;;test-check-id;False;\r\ngcp;The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.;;;{datetime.now()};op.exp.8.gcp.ct.4;Registro de actividad;op.exp.8;operacional;explotación;Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.;alto;requisito;trazabilidad;automático;;MANUAL;Manual check;manual_check;manual;False;Manual check\r\n"
assert content == expected_csv

View File

@@ -8,11 +8,11 @@ from prowler.lib.check.compliance_models import (
ENS_Requirement_Attribute_Tipos,
Generic_Compliance_Requirement_Attribute,
ISO27001_2013_Requirement_Attribute,
KISA_ISMSP_Requirement_Attribute,
Mitre_Requirement,
Mitre_Requirement_Attribute_AWS,
Mitre_Requirement_Attribute_Azure,
Mitre_Requirement_Attribute_GCP,
KISA_ISMSP_Requirement_Attribute,
)
CIS_1_4_AWS_NAME = "cis_1.4_aws"
@@ -469,6 +469,100 @@ ENS_RD2022_AWS = Compliance(
),
],
)
ENS_RD2022_AZURE_NAME = "ens_rd2022_azure"
ENS_RD2022_AZURE = Compliance(
Framework="ENS",
Provider="Azure",
Version="RD2022",
Description="The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.",
Requirements=[
Compliance_Requirement(
Id="op.exp.8.azure.ct.3",
Description="Registro de actividad",
Name=None,
Attributes=[
ENS_Requirement_Attribute(
IdGrupoControl="op.exp.8",
Marco="operacional",
Categoria="explotación",
DescripcionControl="Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.",
Tipo=ENS_Requirement_Attribute_Tipos.requisito,
Nivel=ENS_Requirement_Attribute_Nivel.alto,
Dimensiones=["trazabilidad"],
ModoEjecucion="automático",
Dependencias=[],
)
],
Checks=["cloudtrail_log_file_validation_enabled"],
),
Compliance_Requirement(
Id="op.exp.8.azure.ct.4",
Description="Registro de actividad",
Name=None,
Attributes=[
ENS_Requirement_Attribute(
IdGrupoControl="op.exp.8",
Marco="operacional",
Categoria="explotación",
DescripcionControl="Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.",
Tipo=ENS_Requirement_Attribute_Tipos.requisito,
Nivel=ENS_Requirement_Attribute_Nivel.alto,
Dimensiones=["trazabilidad"],
ModoEjecucion="automático",
Dependencias=[],
)
],
Checks=[],
),
],
)
ENS_RD2022_GCP_NAME = "ens_rd2022_gcp"
ENS_RD2022_GCP = Compliance(
Framework="ENS",
Provider="GCP",
Version="RD2022",
Description="The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.",
Requirements=[
Compliance_Requirement(
Id="op.exp.8.gcp.ct.3",
Description="Registro de actividad",
Name=None,
Attributes=[
ENS_Requirement_Attribute(
IdGrupoControl="op.exp.8",
Marco="operacional",
Categoria="explotación",
DescripcionControl="Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.",
Tipo=ENS_Requirement_Attribute_Tipos.requisito,
Nivel=ENS_Requirement_Attribute_Nivel.alto,
Dimensiones=["trazabilidad"],
ModoEjecucion="automático",
Dependencias=[],
)
],
Checks=["cloudtrail_log_file_validation_enabled"],
),
Compliance_Requirement(
Id="op.exp.8.gcp.ct.4",
Description="Registro de actividad",
Name=None,
Attributes=[
ENS_Requirement_Attribute(
IdGrupoControl="op.exp.8",
Marco="operacional",
Categoria="explotación",
DescripcionControl="Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.",
Tipo=ENS_Requirement_Attribute_Tipos.requisito,
Nivel=ENS_Requirement_Attribute_Nivel.alto,
Dimensiones=["trazabilidad"],
ModoEjecucion="automático",
Dependencias=[],
)
],
Checks=[],
),
],
)
NOT_PRESENT_COMPLIANCE_NAME = "not_present_compliance_name"
NOT_PRESENT_COMPLIANCE = Compliance(
Framework="NOT_EXISTENT",