mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-04-03 14:06:23 +00:00
Compare commits
3 Commits
docs/sso-g
...
poc-gha-ia
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bf816ee42 | ||
|
|
42ab40d079 | ||
|
|
2ce706e474 |
@@ -18,6 +18,7 @@ from prowler.config.config import (
|
||||
json_asff_file_suffix,
|
||||
json_ocsf_file_suffix,
|
||||
orange_color,
|
||||
sarif_file_suffix,
|
||||
)
|
||||
from prowler.lib.banner import print_banner
|
||||
from prowler.lib.check.check import (
|
||||
@@ -69,11 +70,11 @@ from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_github import GithubCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_googleworkspace import GoogleWorkspaceCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_kubernetes import KubernetesCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_m365 import M365CIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
|
||||
from prowler.lib.outputs.compliance.cisa_scuba.cisa_scuba_googleworkspace import (
|
||||
GoogleWorkspaceCISASCuBA,
|
||||
)
|
||||
from prowler.lib.outputs.compliance.cis.cis_m365 import M365CIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
|
||||
from prowler.lib.outputs.compliance.compliance import display_compliance_table
|
||||
from prowler.lib.outputs.compliance.csa.csa_alibabacloud import AlibabaCloudCSA
|
||||
from prowler.lib.outputs.compliance.csa.csa_aws import AWSCSA
|
||||
@@ -122,6 +123,7 @@ from prowler.lib.outputs.html.html import HTML
|
||||
from prowler.lib.outputs.ocsf.ingestion import send_ocsf_to_api
|
||||
from prowler.lib.outputs.ocsf.ocsf import OCSF
|
||||
from prowler.lib.outputs.outputs import extract_findings_statistics, report
|
||||
from prowler.lib.outputs.sarif.sarif import SARIF
|
||||
from prowler.lib.outputs.slack.slack import Slack
|
||||
from prowler.lib.outputs.summary_table import display_summary_table
|
||||
from prowler.providers.alibabacloud.models import AlibabaCloudOutputOptions
|
||||
@@ -546,6 +548,13 @@ def prowler():
|
||||
html_output.batch_write_data_to_file(
|
||||
provider=global_provider, stats=stats
|
||||
)
|
||||
if mode == "sarif":
|
||||
sarif_output = SARIF(
|
||||
findings=finding_outputs,
|
||||
file_path=f"{filename}{sarif_file_suffix}",
|
||||
)
|
||||
generated_outputs["regular"].append(sarif_output)
|
||||
sarif_output.batch_write_data_to_file()
|
||||
|
||||
if getattr(args, "push_to_cloud", False):
|
||||
if not ocsf_output or not getattr(ocsf_output, "file_path", None):
|
||||
|
||||
@@ -110,6 +110,7 @@ json_file_suffix = ".json"
|
||||
json_asff_file_suffix = ".asff.json"
|
||||
json_ocsf_file_suffix = ".ocsf.json"
|
||||
html_file_suffix = ".html"
|
||||
sarif_file_suffix = ".sarif"
|
||||
default_config_file_path = (
|
||||
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/config.yaml"
|
||||
)
|
||||
@@ -120,7 +121,7 @@ default_redteam_config_file_path = (
|
||||
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/llm_config.yaml"
|
||||
)
|
||||
encoding_format_utf_8 = "utf-8"
|
||||
available_output_formats = ["csv", "json-asff", "json-ocsf", "html"]
|
||||
available_output_formats = ["csv", "json-asff", "json-ocsf", "html", "sarif"]
|
||||
|
||||
# Prowler Cloud API settings
|
||||
cloud_api_base_url = os.getenv("PROWLER_CLOUD_API_BASE_URL", "https://api.prowler.com")
|
||||
|
||||
@@ -354,6 +354,9 @@ class Finding(BaseModel):
|
||||
check_output, "resource_line_range", ""
|
||||
)
|
||||
output_data["framework"] = check_output.check_metadata.ServiceName
|
||||
output_data["raw"] = {
|
||||
"resource_line_range": output_data.get("resource_line_range", ""),
|
||||
}
|
||||
|
||||
elif provider.type == "llm":
|
||||
output_data["auth_method"] = provider.auth_method
|
||||
|
||||
0
prowler/lib/outputs/sarif/__init__.py
Normal file
0
prowler/lib/outputs/sarif/__init__.py
Normal file
155
prowler/lib/outputs/sarif/sarif.py
Normal file
155
prowler/lib/outputs/sarif/sarif.py
Normal file
@@ -0,0 +1,155 @@
|
||||
from json import dump
|
||||
from typing import List
|
||||
|
||||
from prowler.config.config import prowler_version
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.outputs.finding import Finding
|
||||
from prowler.lib.outputs.output import Output
|
||||
|
||||
SARIF_SCHEMA_URL = "https://json.schemastore.org/sarif-2.1.0.json"
|
||||
SARIF_VERSION = "2.1.0"
|
||||
|
||||
SEVERITY_TO_SARIF_LEVEL = {
|
||||
"critical": "error",
|
||||
"high": "error",
|
||||
"medium": "warning",
|
||||
"low": "note",
|
||||
"informational": "note",
|
||||
}
|
||||
|
||||
SEVERITY_TO_SECURITY_SEVERITY = {
|
||||
"critical": "9.0",
|
||||
"high": "7.0",
|
||||
"medium": "4.0",
|
||||
"low": "2.0",
|
||||
"informational": "0.0",
|
||||
}
|
||||
|
||||
|
||||
class SARIF(Output):
|
||||
"""Generates SARIF 2.1.0 output compatible with GitHub Code Scanning."""
|
||||
|
||||
def transform(self, findings: List[Finding]) -> None:
|
||||
rules = {}
|
||||
results = []
|
||||
|
||||
for finding in findings:
|
||||
if finding.status != "FAIL":
|
||||
continue
|
||||
|
||||
check_id = finding.metadata.CheckID
|
||||
severity = finding.metadata.Severity.lower()
|
||||
|
||||
if check_id not in rules:
|
||||
rule = {
|
||||
"id": check_id,
|
||||
"name": check_id,
|
||||
"shortDescription": {"text": finding.metadata.CheckTitle},
|
||||
"fullDescription": {
|
||||
"text": finding.metadata.Description or check_id
|
||||
},
|
||||
"help": {
|
||||
"text": finding.metadata.Remediation.Recommendation.Text
|
||||
or finding.metadata.Description
|
||||
or check_id,
|
||||
},
|
||||
"defaultConfiguration": {
|
||||
"level": SEVERITY_TO_SARIF_LEVEL.get(severity, "note"),
|
||||
},
|
||||
"properties": {
|
||||
"tags": [
|
||||
"security",
|
||||
f"prowler/{finding.metadata.Provider}",
|
||||
f"severity/{severity}",
|
||||
],
|
||||
"security-severity": SEVERITY_TO_SECURITY_SEVERITY.get(
|
||||
severity, "0.0"
|
||||
),
|
||||
},
|
||||
}
|
||||
if finding.metadata.RelatedUrl:
|
||||
rule["helpUri"] = finding.metadata.RelatedUrl
|
||||
rules[check_id] = rule
|
||||
|
||||
rule_index = list(rules.keys()).index(check_id)
|
||||
result = {
|
||||
"ruleId": check_id,
|
||||
"ruleIndex": rule_index,
|
||||
"level": SEVERITY_TO_SARIF_LEVEL.get(severity, "note"),
|
||||
"message": {
|
||||
"text": finding.status_extended or finding.metadata.CheckTitle
|
||||
},
|
||||
}
|
||||
|
||||
location = self._build_location(finding)
|
||||
if location:
|
||||
result["locations"] = [location]
|
||||
|
||||
results.append(result)
|
||||
|
||||
sarif_document = {
|
||||
"$schema": SARIF_SCHEMA_URL,
|
||||
"version": SARIF_VERSION,
|
||||
"runs": [
|
||||
{
|
||||
"tool": {
|
||||
"driver": {
|
||||
"name": "Prowler",
|
||||
"version": prowler_version,
|
||||
"informationUri": "https://prowler.com",
|
||||
"rules": list(rules.values()),
|
||||
},
|
||||
},
|
||||
"results": results,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
self._data = sarif_document
|
||||
|
||||
def batch_write_data_to_file(self) -> None:
|
||||
try:
|
||||
if (
|
||||
getattr(self, "_file_descriptor", None)
|
||||
and not self._file_descriptor.closed
|
||||
and self._data
|
||||
):
|
||||
dump(self._data, self._file_descriptor, indent=2)
|
||||
self._file_descriptor.close()
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_location(finding: Finding) -> dict:
|
||||
"""Build a SARIF physicalLocation from a Finding.
|
||||
|
||||
Uses resource_name as the artifact URI and resource_line_range
|
||||
(stored in finding.raw for IaC findings) for region info.
|
||||
"""
|
||||
if not finding.resource_name:
|
||||
return {}
|
||||
|
||||
location = {
|
||||
"physicalLocation": {
|
||||
"artifactLocation": {
|
||||
"uri": finding.resource_name,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
line_range = finding.raw.get("resource_line_range", "")
|
||||
if line_range and ":" in line_range:
|
||||
parts = line_range.split(":")
|
||||
try:
|
||||
start_line = int(parts[0])
|
||||
end_line = int(parts[1])
|
||||
location["physicalLocation"]["region"] = {
|
||||
"startLine": start_line,
|
||||
"endLine": end_line,
|
||||
}
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
return location
|
||||
@@ -9,6 +9,7 @@ from prowler.config.config import (
|
||||
json_asff_file_suffix,
|
||||
json_ocsf_file_suffix,
|
||||
orange_color,
|
||||
sarif_file_suffix,
|
||||
)
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.github.models import GithubAppIdentityInfo, GithubIdentityInfo
|
||||
@@ -207,6 +208,10 @@ def display_summary_table(
|
||||
print(
|
||||
f" - HTML: {output_directory}/{output_filename}{html_file_suffix}"
|
||||
)
|
||||
if "sarif" in output_options.output_modes:
|
||||
print(
|
||||
f" - SARIF: {output_directory}/{output_filename}{sarif_file_suffix}"
|
||||
)
|
||||
|
||||
else:
|
||||
print(
|
||||
|
||||
0
tests/lib/outputs/sarif/__init__.py
Normal file
0
tests/lib/outputs/sarif/__init__.py
Normal file
257
tests/lib/outputs/sarif/sarif_test.py
Normal file
257
tests/lib/outputs/sarif/sarif_test.py
Normal file
@@ -0,0 +1,257 @@
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.lib.outputs.sarif.sarif import SARIF, SARIF_SCHEMA_URL, SARIF_VERSION
|
||||
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
|
||||
|
||||
|
||||
class TestSARIF:
|
||||
def test_transform_fail_finding(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
status_extended="S3 bucket is not encrypted",
|
||||
severity="high",
|
||||
resource_name="main.tf",
|
||||
service_name="s3",
|
||||
check_id="s3_encryption_check",
|
||||
check_title="S3 Bucket Encryption",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
assert sarif.data["$schema"] == SARIF_SCHEMA_URL
|
||||
assert sarif.data["version"] == SARIF_VERSION
|
||||
assert len(sarif.data["runs"]) == 1
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert run["tool"]["driver"]["name"] == "Prowler"
|
||||
assert len(run["tool"]["driver"]["rules"]) == 1
|
||||
assert len(run["results"]) == 1
|
||||
|
||||
rule = run["tool"]["driver"]["rules"][0]
|
||||
assert rule["id"] == "s3_encryption_check"
|
||||
assert rule["shortDescription"]["text"] == "S3 Bucket Encryption"
|
||||
assert rule["defaultConfiguration"]["level"] == "error"
|
||||
assert rule["properties"]["security-severity"] == "7.0"
|
||||
|
||||
result = run["results"][0]
|
||||
assert result["ruleId"] == "s3_encryption_check"
|
||||
assert result["ruleIndex"] == 0
|
||||
assert result["level"] == "error"
|
||||
assert result["message"]["text"] == "S3 bucket is not encrypted"
|
||||
|
||||
def test_transform_pass_finding_excluded(self):
|
||||
finding = generate_finding_output(status="PASS", severity="high")
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["results"]) == 0
|
||||
assert len(run["tool"]["driver"]["rules"]) == 0
|
||||
|
||||
def test_transform_muted_finding_excluded(self):
|
||||
finding = generate_finding_output(status="FAIL", severity="high", muted=True)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["results"]) == 1
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"severity,expected_level,expected_security_severity",
|
||||
[
|
||||
("critical", "error", "9.0"),
|
||||
("high", "error", "7.0"),
|
||||
("medium", "warning", "4.0"),
|
||||
("low", "note", "2.0"),
|
||||
("informational", "note", "0.0"),
|
||||
],
|
||||
)
|
||||
def test_transform_severity_mapping(
|
||||
self, severity, expected_level, expected_security_severity
|
||||
):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
severity=severity,
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
result = run["results"][0]
|
||||
rule = run["tool"]["driver"]["rules"][0]
|
||||
|
||||
assert result["level"] == expected_level
|
||||
assert rule["defaultConfiguration"]["level"] == expected_level
|
||||
assert rule["properties"]["security-severity"] == expected_security_severity
|
||||
|
||||
def test_transform_multiple_findings_dedup_rules(self):
|
||||
findings = [
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="file1.tf",
|
||||
status_extended="Finding in file1",
|
||||
),
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="file2.tf",
|
||||
status_extended="Finding in file2",
|
||||
),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["tool"]["driver"]["rules"]) == 1
|
||||
assert len(run["results"]) == 2
|
||||
assert run["results"][0]["ruleIndex"] == 0
|
||||
assert run["results"][1]["ruleIndex"] == 0
|
||||
|
||||
def test_transform_multiple_different_rules(self):
|
||||
findings = [
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
service_name="alpha",
|
||||
check_id="alpha_check_one",
|
||||
status_extended="Finding A",
|
||||
),
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
service_name="beta",
|
||||
check_id="beta_check_two",
|
||||
status_extended="Finding B",
|
||||
),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["tool"]["driver"]["rules"]) == 2
|
||||
assert run["results"][0]["ruleIndex"] == 0
|
||||
assert run["results"][1]["ruleIndex"] == 1
|
||||
|
||||
def test_transform_location_with_line_range(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="modules/s3/main.tf",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "10:25"}
|
||||
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data["runs"][0]["results"][0]
|
||||
location = result["locations"][0]["physicalLocation"]
|
||||
assert location["artifactLocation"]["uri"] == "modules/s3/main.tf"
|
||||
assert location["region"]["startLine"] == 10
|
||||
assert location["region"]["endLine"] == 25
|
||||
|
||||
def test_transform_location_without_line_range(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data["runs"][0]["results"][0]
|
||||
location = result["locations"][0]["physicalLocation"]
|
||||
assert location["artifactLocation"]["uri"] == "main.tf"
|
||||
assert "region" not in location
|
||||
|
||||
def test_transform_no_resource_name(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data["runs"][0]["results"][0]
|
||||
assert "locations" not in result
|
||||
|
||||
def test_batch_write_data_to_file(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
status_extended="test finding",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sarif", delete=False
|
||||
) as tmp:
|
||||
tmp_path = tmp.name
|
||||
|
||||
sarif = SARIF(
|
||||
findings=[finding],
|
||||
file_path=tmp_path,
|
||||
)
|
||||
sarif.batch_write_data_to_file()
|
||||
|
||||
with open(tmp_path) as f:
|
||||
content = json.load(f)
|
||||
|
||||
assert content["$schema"] == SARIF_SCHEMA_URL
|
||||
assert content["version"] == SARIF_VERSION
|
||||
assert len(content["runs"][0]["results"]) == 1
|
||||
|
||||
def test_sarif_schema_structure(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
severity="critical",
|
||||
resource_name="infra/main.tf",
|
||||
service_name="iac",
|
||||
check_id="iac_misconfig_check",
|
||||
check_title="IaC Misconfiguration",
|
||||
description="Checks for misconfigurations",
|
||||
remediation_recommendation_text="Fix the configuration",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "5:15"}
|
||||
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
doc = sarif.data
|
||||
|
||||
assert "$schema" in doc
|
||||
assert "version" in doc
|
||||
assert "runs" in doc
|
||||
|
||||
run = doc["runs"][0]
|
||||
|
||||
assert "tool" in run
|
||||
assert "driver" in run["tool"]
|
||||
driver = run["tool"]["driver"]
|
||||
assert "name" in driver
|
||||
assert "version" in driver
|
||||
assert "informationUri" in driver
|
||||
assert "rules" in driver
|
||||
|
||||
rule = driver["rules"][0]
|
||||
assert "id" in rule
|
||||
assert "shortDescription" in rule
|
||||
assert "fullDescription" in rule
|
||||
assert "help" in rule
|
||||
assert "defaultConfiguration" in rule
|
||||
assert "properties" in rule
|
||||
assert "tags" in rule["properties"]
|
||||
assert "security-severity" in rule["properties"]
|
||||
|
||||
result = run["results"][0]
|
||||
assert "ruleId" in result
|
||||
assert "ruleIndex" in result
|
||||
assert "level" in result
|
||||
assert "message" in result
|
||||
assert "locations" in result
|
||||
|
||||
loc = result["locations"][0]["physicalLocation"]
|
||||
assert "artifactLocation" in loc
|
||||
assert "uri" in loc["artifactLocation"]
|
||||
assert "region" in loc
|
||||
assert "startLine" in loc["region"]
|
||||
assert "endLine" in loc["region"]
|
||||
|
||||
def test_empty_findings(self):
|
||||
sarif = SARIF(findings=[], file_path=None)
|
||||
assert sarif.data == []
|
||||
|
||||
def test_only_pass_findings(self):
|
||||
findings = [
|
||||
generate_finding_output(status="PASS"),
|
||||
generate_finding_output(status="PASS"),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["results"]) == 0
|
||||
assert len(run["tool"]["driver"]["rules"]) == 0
|
||||
Reference in New Issue
Block a user