feat(compliance): add Prowler ThreatScore for the AlibabaCloud provider (#9511)

This commit is contained in:
Pedro Martín
2025-12-19 09:36:42 +01:00
committed by GitHub
parent ec32be2f1d
commit 9bf3702d71
7 changed files with 1327 additions and 8 deletions

View File

@@ -0,0 +1,28 @@
import warnings
from dashboard.common_methods import get_section_containers_threatscore
warnings.filterwarnings("ignore")
def get_table(data):
aux = data[
[
"REQUIREMENTS_ID",
"REQUIREMENTS_DESCRIPTION",
"REQUIREMENTS_ATTRIBUTES_SECTION",
"REQUIREMENTS_ATTRIBUTES_SUBSECTION",
"CHECKID",
"STATUS",
"REGION",
"ACCOUNTID",
"RESOURCEID",
]
].copy()
return get_section_containers_threatscore(
aux,
"REQUIREMENTS_ATTRIBUTES_SECTION",
"REQUIREMENTS_ATTRIBUTES_SUBSECTION",
"REQUIREMENTS_ID",
)

View File

@@ -407,9 +407,11 @@ def display_data(
compliance_module = importlib.import_module(
f"dashboard.compliance.{current}"
)
data = data.drop_duplicates(
subset=["CHECKID", "STATUS", "MUTED", "RESOURCEID", "STATUSEXTENDED"]
)
# Build subset list based on available columns
dedup_columns = ["CHECKID", "STATUS", "RESOURCEID", "STATUSEXTENDED"]
if "MUTED" in data.columns:
dedup_columns.insert(2, "MUTED")
data = data.drop_duplicates(subset=dedup_columns)
if "threatscore" in analytics_input:
data = get_threatscore_mean_by_pillar(data)
@@ -652,6 +654,7 @@ def get_table(current_compliance, table):
def get_threatscore_mean_by_pillar(df):
score_per_pillar = {}
max_score_per_pillar = {}
counted_findings_per_pillar = {}
for _, row in df.iterrows():
pillar = (
@@ -663,6 +666,18 @@ def get_threatscore_mean_by_pillar(df):
if pillar not in score_per_pillar:
score_per_pillar[pillar] = 0
max_score_per_pillar[pillar] = 0
counted_findings_per_pillar[pillar] = set()
# Skip muted findings for score calculation
is_muted = "MUTED" in df.columns and row.get("MUTED") == "True"
if is_muted:
continue
# Create unique finding identifier to avoid counting duplicates
finding_id = f"{row.get('CHECKID', '')}_{row.get('RESOURCEID', '')}"
if finding_id in counted_findings_per_pillar[pillar]:
continue
counted_findings_per_pillar[pillar].add(finding_id)
level_of_risk = pd.to_numeric(
row["REQUIREMENTS_ATTRIBUTES_LEVELOFRISK"], errors="coerce"
@@ -706,6 +721,10 @@ def get_table_prowler_threatscore(df):
score_per_pillar = {}
max_score_per_pillar = {}
pillars = {}
counted_findings_per_pillar = {}
counted_pass = set()
counted_fail = set()
counted_muted = set()
df_copy = df.copy()
@@ -720,6 +739,24 @@ def get_table_prowler_threatscore(df):
pillars[pillar] = {"FAIL": 0, "PASS": 0, "MUTED": 0}
score_per_pillar[pillar] = 0
max_score_per_pillar[pillar] = 0
counted_findings_per_pillar[pillar] = set()
# Create unique finding identifier
finding_id = f"{row.get('CHECKID', '')}_{row.get('RESOURCEID', '')}"
# Check if muted
is_muted = "MUTED" in df_copy.columns and row.get("MUTED") == "True"
# Count muted findings (separate from score calculation)
if is_muted and finding_id not in counted_muted:
counted_muted.add(finding_id)
pillars[pillar]["MUTED"] += 1
continue # Skip muted findings for score calculation
# Skip if already counted for this pillar
if finding_id in counted_findings_per_pillar[pillar]:
continue
counted_findings_per_pillar[pillar].add(finding_id)
level_of_risk = pd.to_numeric(
row["REQUIREMENTS_ATTRIBUTES_LEVELOFRISK"], errors="coerce"
@@ -738,13 +775,14 @@ def get_table_prowler_threatscore(df):
max_score_per_pillar[pillar] += level_of_risk * weight
if row["STATUS"] == "PASS":
pillars[pillar]["PASS"] += 1
if finding_id not in counted_pass:
counted_pass.add(finding_id)
pillars[pillar]["PASS"] += 1
score_per_pillar[pillar] += level_of_risk * weight
elif row["STATUS"] == "FAIL":
pillars[pillar]["FAIL"] += 1
if "MUTED" in row and row["MUTED"] == "True":
pillars[pillar]["MUTED"] += 1
if finding_id not in counted_fail:
counted_fail.add(finding_id)
pillars[pillar]["FAIL"] += 1
result_df = []

View File

@@ -2,6 +2,13 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.17.0] (Prowler UNRELEASED)
### Added
- Add Prowler ThreatScore for the Alibaba Cloud provider [(#9511)](https://github.com/prowler-cloud/prowler/pull/9511)
---
## [5.16.0] (Prowler v5.16.0)
### Added

View File

@@ -83,6 +83,9 @@ from prowler.lib.outputs.compliance.mitre_attack.mitre_attack_azure import (
AzureMitreAttack,
)
from prowler.lib.outputs.compliance.mitre_attack.mitre_attack_gcp import GCPMitreAttack
from prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore_alibaba import (
ProwlerThreatScoreAlibaba,
)
from prowler.lib.outputs.compliance.prowler_threatscore.prowler_threatscore_aws import (
ProwlerThreatScoreAWS,
)
@@ -1039,6 +1042,18 @@ def prowler():
)
generated_outputs["compliance"].append(cis)
cis.batch_write_data_to_file()
elif compliance_name == "prowler_threatscore_alibabacloud":
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
prowler_threatscore = ProwlerThreatScoreAlibaba(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
file_path=filename,
)
generated_outputs["compliance"].append(prowler_threatscore)
prowler_threatscore.batch_write_data_to_file()
else:
filename = (
f"{output_options.output_directory}/compliance/"

File diff suppressed because it is too large Load Diff

View File

@@ -146,3 +146,29 @@ class ProwlerThreatScoreKubernetesModel(BaseModel):
Muted: bool
Framework: str
Name: str
class ProwlerThreatScoreAlibabaModel(BaseModel):
"""
ProwlerThreatScoreAlibabaModel generates a finding's output in Alibaba Cloud Prowler ThreatScore Compliance format.
"""
Provider: str
Description: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_Title: str
Requirements_Attributes_Section: str
Requirements_Attributes_SubSection: Optional[str] = None
Requirements_Attributes_AttributeDescription: str
Requirements_Attributes_AdditionalInformation: str
Requirements_Attributes_LevelOfRisk: int
Requirements_Attributes_Weight: int
Status: str
StatusExtended: str
ResourceId: str
ResourceName: str
CheckId: str

View File

@@ -0,0 +1,98 @@
from prowler.config.config import timestamp
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.prowler_threatscore.models import (
ProwlerThreatScoreAlibabaModel,
)
from prowler.lib.outputs.finding import Finding
class ProwlerThreatScoreAlibaba(ComplianceOutput):
"""
This class represents the Alibaba Cloud Prowler ThreatScore compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into Alibaba Cloud Prowler ThreatScore compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into Alibaba Cloud Prowler ThreatScore compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreAlibabaModel(
Provider=finding.provider,
Description=compliance.Description,
AccountId=finding.account_uid,
Region=finding.region,
AssessmentDate=str(timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Title=attribute.Title,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_SubSection=attribute.SubSection,
Requirements_Attributes_AttributeDescription=attribute.AttributeDescription,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_LevelOfRisk=attribute.LevelOfRisk,
Requirements_Attributes_Weight=attribute.Weight,
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
Framework=compliance.Framework,
Name=compliance.Name,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreAlibabaModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
AccountId="",
Region="",
AssessmentDate=str(timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Title=attribute.Title,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_SubSection=attribute.SubSection,
Requirements_Attributes_AttributeDescription=attribute.AttributeDescription,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_LevelOfRisk=attribute.LevelOfRisk,
Requirements_Attributes_Weight=attribute.Weight,
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
Framework=compliance.Framework,
Name=compliance.Name,
)
self._data.append(compliance_row)